Skip to content

Extract audit logs from RDS PostgreSQL or Aurora PostgreSQL log file, exported to Cloudwatch log, in CSV file and upload them to Amazon S3 bucket.

9 minute read
Content level: Intermediate
2

This article will help you to extract audit information from RDS PostgreSQL/Aurora PostgreSQL log file exported to Cloudwatch and upload that data to Amazon S3 bucket in CSV format. Keeping daily audit information in one csv file for each day.

Introduction

As of per current process audit information for RDS PostgreSQL and Aurora PostgreSQL are logged in postgreSQL log file and that can be exported to cloudwatch logs. Reading audit information from log file is complex as logs file contain huge amount of logging information for database.

This document provides a comprehensive guide on exporting RDS PostgreSQL or Aurora PostgreSQL audit logs from CloudWatch Logs to Amazon S3 in CSV format. The solution automatically processes audit logs in near real-time, making them easily accessible and queryable for compliance, security analysis, and auditing purposes. Using this approach, audit data is extracted to CSV file and append all audit data in one file on daily basis.

Benefits of Exporting to S3

Exporting audit logs to S3 offers several advantages:

  • Cost-effective long-term storage
  • Structured format (CSV) for easy analysis
  • Integration with analytics services (Athena, Redshift, etc.)
  • Immutable storage for compliance requirements
  • Lifecycle policies for archiving older logs

Architecture

  1. RDS PostgreSQL/Aurora PostgreSQL generates audit logs and that are sent to CloudWatch Logs if export to cloudwach is enabled
  2. The Subscription Filter identifies audit log entries and invokes the Lambda function
  3. Lambda parses the logs, extracts structured data, and converts to CSV format
  4. Lambda appends the new log entries to a daily CSV file in S3
  5. Files are organized in S3 using a YYYY/MM/DD folder structure

Prerequisite

  1. Make sure you have auditing enabled in your RDS PostgreSQL or Aurora PostgreSQL instance/cluster and extension is already created in database. If not already enable, then kindly enable auditing first for RDS PostgreSQL or Aurora PostgreSQL

  2. Make sure postgresql.log export to cloudwatch is enabled. If not please enable exporting postgresql log to cloudwatch.

  3. CloudWatch Logs group receiving PostgreSQL logs

  4. S3 bucket for storing processed logs

  5. IAM permissions to create Lambda functions and CloudWatch Logs subscription filters

Step 1: Create IAM Role for Lambda

Create an IAM role with permissions to:

  • Write logs to CloudWatch Logs
  • Read from CloudWatch Logs
  • Read and write objects in the target S3 bucket
# Create IAM role with trust policy
aws iam create-role \
  --role-name PostgreSQLAuditLogProcessorRole \
  --assume-role-policy-document '{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": "lambda.amazonaws.com"
        },
        "Action": "sts:AssumeRole"
      }
    ]
  }'

# Attach policy to IAM role
aws iam put-role-policy \
  --role-name PostgreSQLAuditLogProcessorRole \
  --policy-name LambdaPolicy \
  --policy-document '{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ],
        "Resource": "arn:aws:logs:*:*:*"
      },
      {
        "Effect": "Allow",
        "Action": [
          "s3:PutObject",
          "s3:GetObject",
          "s3:ListBucket"
        ],
        "Resource": [
          "arn:aws:s3:::<YOUR_BUCKET_NAME>/*",
          "arn:aws:s3:::<YOUR_BUCKET_NAME>"
        ]
      }
    ]
  }'

Step 2: Create Lambda Function

Create a Lambda function to process the audit logs:

  1. Create a Python file named postgresql_audit_log_processor.py with the following code:
import base64, gzip, json, io, csv, re, traceback
import boto3
from datetime import datetime
from botocore.exceptions import ClientError

# Initialize S3 client and constants
s3 = boto3.client('s3')
TARGET_BUCKET = '<YOUR_BUCKET_NAME>'
TARGET_PREFIX = 'audit_logs/'
CSV_HEADERS = ['timestamp', 'log_level', 'user', 'database', 'action', 'object_type', 'object_name', 'query']

def parse_postgresql_log(log_event):
    """Parse PostgreSQL log entry into structured data"""
    try:
        message = log_event.get('message', '')
        if 'AUDIT:' not in message:
            return None
            
        # Extract timestamp and initialize log data
        timestamp = datetime.fromtimestamp(log_event.get('timestamp', 0) / 1000.0).isoformat()
        log_data = {
            'timestamp': timestamp, 'log_level': 'INFO', 'user': 'unknown',
            'database': 'unknown', 'action': 'unknown', 'object_type': 'unknown',
            'object_name': 'unknown', 'query': ''
        }
        
        # Extract username and database
        user_db_match = re.search(r'(\w+)@(\w+):', message)
        if user_db_match:
            log_data['user'] = user_db_match.group(1)
            log_data['database'] = user_db_match.group(2)
        
        # Extract audit details
        audit_match = re.search(r'AUDIT:\s+([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),(.*?)(?:,<none>|$)', message)
        if audit_match:
            log_data['action'] = audit_match.group(4)
            log_data['object_type'] = audit_match.group(6)
            log_data['object_name'] = audit_match.group(7)
            
            # Clean up query
            query = audit_match.group(8).strip('"').replace('\\"', '"').replace('\\n', ' ').replace('\\t', ' ')
            log_data['query'] = query
        
        # Alternative parsing for TABLE operations
        if not log_data['query'] and ',TABLE,' in message:
            parts = message.split(',TABLE,')
            if len(parts) > 1:
                table_and_query = parts[1].split(',', 1)
                if len(table_and_query) > 1:
                    log_data['object_type'] = 'TABLE'
                    log_data['object_name'] = table_and_query[0]
                    query_part = table_and_query[1].split(',<none>')[0] if ',<none>' in table_and_query[1] else table_and_query[1]
                    log_data['query'] = query_part.strip('"')
        
        # Determine action from query if not already set
        if log_data['action'] in ['unknown', 'WRITE']:
            for action in ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'CREATE', 'ALTER', 'DROP']:
                if action in message:
                    log_data['action'] = action
                    break
        
        return log_data
    except Exception as e:
        print(f"Error parsing log: {str(e)}")
        return None

def file_exists(bucket, key):
    """Check if a file exists in S3"""
    try:
        s3.head_object(Bucket=bucket, Key=key)
        return True
    except ClientError as e:
        return False if e.response['Error']['Code'] == '404' else None

def append_to_csv_file(bucket, key, csv_rows):
    """Append rows to an existing CSV file or create a new one if it doesn't exist"""
    try:
        # Create CSV content for new rows
        csv_buffer = io.StringIO()
        writer = csv.DictWriter(csv_buffer, fieldnames=CSV_HEADERS)
        
        # Check if file exists
        exists = file_exists(bucket, key)
        if not exists:
            writer.writeheader()
        
        # Write rows
        for row in csv_rows:
            writer.writerow(row)
        
        # If file exists, append to it
        if exists:
            try:
                # Get existing content
                response = s3.get_object(Bucket=bucket, Key=key)
                existing_content = response['Body'].read().decode('utf-8')
                
                # Prepare new content (skip header)
                new_content = csv_buffer.getvalue()
                if '\n' in new_content and new_content.startswith(CSV_HEADERS[0]):
                    new_content = new_content.split('\n', 1)[1] if '\n' in new_content else ''
                
                # Ensure newline at end of existing content
                if existing_content and not existing_content.endswith('\n'):
                    existing_content += '\n'
                
                # Upload combined content
                s3.put_object(Bucket=bucket, Key=key, Body=existing_content + new_content, ContentType='text/csv')
            except Exception as e:
                # Fall back to creating new file on error
                print(f"Error appending, creating new file: {str(e)}")
                s3.put_object(Bucket=bucket, Key=key, Body=csv_buffer.getvalue(), ContentType='text/csv')
        else:
            # Create new file
            s3.put_object(Bucket=bucket, Key=key, Body=csv_buffer.getvalue(), ContentType='text/csv')
    except Exception as e:
        print(f"Error in append_to_csv_file: {str(e)}")
        raise

def lambda_handler(event, context):
    try:
        # Handle CloudWatch Logs events
        if 'awslogs' in event:
            # Decode and decompress logs
            compressed_payload = base64.b64decode(event['awslogs']['data'])
            log_data = json.loads(gzip.decompress(compressed_payload))
            
            # Process log events
            csv_rows = []
            for log_event in log_data['logEvents']:
                parsed_log = parse_postgresql_log(log_event)
                if parsed_log:
                    csv_rows.append(parsed_log)
            
            # If we have logs to process
            if csv_rows:
                date_prefix = datetime.now().strftime('%Y/%m/%d/')
                s3_key = f"{TARGET_PREFIX}{date_prefix}audit_logs_daily.csv"
                append_to_csv_file(TARGET_BUCKET, s3_key, csv_rows)
                return {'statusCode': 200, 'body': f"Processed {len(csv_rows)} audit log entries"}
            else:
                return {'statusCode': 200, 'body': "No audit logs to process"}
        
        return {'statusCode': 400, 'body': "Invalid event format"}
    except Exception as e:
        print(f"Error: {str(e)}")
        print(traceback.format_exc())
        raise
  1. Create a deployment package:
zip -j postgresql_audit_log_processor.zip postgresql_audit_log_processor.py
  1. Deploy the Lambda function:
aws lambda create-function \
  --function-name PostgreSQLAuditLogProcessor \
  --runtime python3.9 \
  --role arn:aws:iam::<YOUR_ACCOUNT_ID>:role/PostgreSQLAuditLogProcessorRole \
  --handler postgresql_audit_log_processor.lambda_handler \
  --description "Lambda function to process PostgreSQL audit logs and append to CSV in S3" \
  --timeout 60 \
  --memory-size 128 \
  --zip-file fileb://postgresql_audit_log_processor.zip \
  --region <YOUR_REGION>

Step 3: Add CloudWatch Logs Permission

Grant CloudWatch Logs permission to invoke the Lambda function:

aws lambda add-permission \
  --function-name PostgreSQLAuditLogProcessor \
  --statement-id CloudWatchLogsToLambda \
  --action lambda:InvokeFunction \
  --principal logs.<YOUR_REGION>.amazonaws.com \
  --source-arn "arn:aws:logs:<YOUR_REGION>:<YOUR_ACCOUNT_ID>:log-group:/aws/rds/cluster/<YOUR_CLUSTER_NAME>/postgresql:*" \
  --region <YOUR_REGION>

Step 4: Create CloudWatch Logs Subscription Filter

Create a subscription filter to trigger the Lambda function when audit logs are detected:

aws logs put-subscription-filter \
  --filter-name PostgreSQLAuditLogFilter \
  --filter-pattern "AUDIT" \
  --log-group-name "/aws/rds/cluster/<YOUR_CLUSTER_NAME>/postgresql" \
  --destination-arn "arn:aws:lambda:<YOUR_REGION>:<YOUR_ACCOUNT_ID>:function:PostgreSQLAuditLogProcessor" \
  --region <YOUR_REGION>

Step 5: Test the Solution

  1. Generate actual audit logs by connecting to your PostgreSQL database and executing some SQL commands:
-- Create a test table
CREATE TABLE test_audit_table (id serial PRIMARY KEY, name text, created_at timestamp DEFAULT current_timestamp);

-- Insert some data
INSERT INTO test_audit_table (name) VALUES ('Test Record 1');
INSERT INTO test_audit_table (name) VALUES ('Test Record 2');

-- Update data
UPDATE test_audit_table SET name = 'Updated Record' WHERE id = 1;

-- Delete data
DELETE FROM test_audit_table WHERE id = 2;
  1. Check if the audit logs have been processed and exported to S3:
# List files in S3 bucket
aws s3 ls s3://<YOUR_BUCKET_NAME>/audit_logs/ --recursive --region <YOUR_REGION>

# Download and view CSV file
aws s3 cp s3://<YOUR_BUCKET_NAME>/audit_logs/$(date +%Y/%m/%d)/audit_logs_daily.csv audit_logs_daily.csv --region <YOUR_REGION>
cat audit_logs_daily.csv
  1. Output should like below in csv file :
timestamp	        log_level	user	database	action	object_type	object_name	query
2025-07-06T05:55:03	INFO	postgres	postgres	INSERT	TABLE	public.test1	insert into test1 values (6);
2025-07-06T05:55:08	INFO	postgres	postgres	INSERT	TABLE	public.test1	insert into test1 values (7);
2025-07-06T05:55:12	INFO	postgres	postgres	UPDATE	TABLE	public.test1	update test1 set id=10 where id<5;
2025-07-06T06:00:46	INFO	postgres	postgres	INSERT	TABLE	public.test1	insert into test1 values (99);
2025-07-06T06:00:50	INFO	postgres	postgres	INSERT	TABLE	public.test1	insert into test1 values (88);
2025-07-06T06:00:54	INFO	postgres	postgres	INSERT	TABLE	public.test1	insert into test1 values (77);

CSV Format and Data Structure

The exported CSV files include the following columns:

  • timestamp: ISO 8601 timestamp of the audit log entry
  • log_level: Log level (INFO, WARNING, ERROR, etc.)
  • user: Database user who performed the action
  • database: Database where the action was performed
  • action: Type of action (SELECT, INSERT, UPDATE, DELETE, etc.)
  • object_type: Type of object affected (TABLE, INDEX, etc.)
  • object_name: Name of the object affected
  • query: Full SQL query that was executed

Troubleshooting

Common Issues and Solutions

  1. Lambda function not being triggered:

    • Verify the CloudWatch Logs subscription filter is correctly configured
    • Check that the log group name matches your Aurora PostgreSQL cluster's log group
    • Ensure the filter pattern "AUDIT" is appropriate for your audit logs
  2. Lambda function failing:

    • Check Lambda function logs in CloudWatch Logs
    • Verify IAM permissions for accessing S3 and CloudWatch Logs
    • Ensure the S3 bucket exists and is accessible
  3. No audit logs in CloudWatch:

    • Verify pgAudit extension is enabled in your PostgreSQL cluster
    • Check that audit logging parameters are correctly configured
    • Ensure log export to CloudWatch Logs is enabled for your RDS/Aurora cluster
  4. CSV files not appearing in S3:

    • Check Lambda function execution logs for errors
    • Verify S3 bucket name and prefix in the Lambda function code
    • Ensure Lambda has proper permissions to write to the S3 bucket

Debugging Tips

  1. Enable detailed CloudWatch Logs for the Lambda function
  2. Test the Lambda function with a sample event
  3. Check S3 bucket permissions and policies
  4. Verify the format of audit logs in CloudWatch Logs

Cost Optimization

  1. Lambda Optimization: Adjust memory allocation based on processing needs
  2. S3 Storage Classes: Use appropriate storage classes for different retention periods
  3. CloudWatch Logs Retention: Set appropriate retention periods for CloudWatch Logs
  4. Filtering: Adjust the subscription filter to only process relevant audit logs