Use generic logic to manage data warehouse change data capture (CDC) in Amazon Redshift

15 minute read
Content level: Advanced
2

This article provides a method of creating a generic procedure to implement change data capture (CDC) into slowly changing dimension type 2 (SCD2) tables on Amazon Redshift.

Introduction

It is common for customers to track changes on dimensional entities within a data warehouse. The process of tracking changes against rows in database tables is commonly referred to as change data capture, or CDC. Implementing CDC processes within the data warehouse enables point-in-time reporting, enhanced traceability and auditability of data changes, and is a key enabler for time-series based Machine Learning (ML) use cases.

In this article, we create and execute a generic stored procedure that takes a set of inputs such as source and target table names to maintain slowly changing dimension type 2 (SCD2) tables. The procedure can be extended and updated to provide functionality specific to your data modelling conventions.

Scenario

Consider a fictitious company who operate their data warehouse in Amazon Redshift. They are looking to implement SCD2 structures on the modelled schema of their data warehouse to enable their users to perform point-in-time reporting and be able to view how records change over time. Specifically, they want to maintain three audit columns in each of their modelled tables:

  • effective_from_timestamp: this column will be populated with the time the record became active in the table. That is, when was it loaded into the table.
  • effective_to_timestamp: this column will be populated with the time the record became inactive in the table. If the record is currently active, this column will be populated with a date arbitrarily far into the future (9999-12-31). If the record is currently inactive, this column will be populated with the time the record became inactive in the warehouse. A record will become inactive if it is updated or deleted in the source table.
  • active_indicator: this column holds a boolean value of true when the record is active in the table, and false when the record is inactive.

Using the three audit columns, this company will always be able to identify which records are active by adding the following filter:

  • where active_indicator = true

This company will also be able to see what the value of a record was at any point in time by applying the following filter:

  • where '2024-01-01'::date between effective_from_timestamp and effective_to_timestamp

These audit columns also serve to show changes over time to assist with any troubleshooting or debugging in datasets.

Note that this article will provide a stored procedure to perform full CDC between a source and target. If partial CDC or more custom behaviour is desired please let us know in the comments.

Example behaviour

Assume this company has a source table that holds a list of their customer’s preferred name. This preferred name can change over time, and the company would like to be able to see their customer’s preferred name over time.

The below table shows an example of what this table could look like. It has a customer_id column, which is the primary key, and a customer_preferred_name column.

Structure of the fictitious customer table.

Upon initial load, the modelled SCD2 table would appear as in the below figure. Note that the audit fields have been appended with timestamps that show when the active records were inserted.

Structure of customer table with audit columns appended.

Now, assume there were some changes to the table on the 17th January 2024. These changes include an update to Katherine’s preferred name (now Kate), and the addition of a new customer, Rashi. The new source table is as shown below, with the changes highlighted.

Source customer table with updated records.

After running an SCD2 process to merge these changes, the modelled table would appear as in the below figure. Changes to the table are highlighted for your convenience.

Final updated modelled table.

With this behaviour, it is possible to identify which records were active at any point-in-time.

Pre-requisites

In order to execute the steps in this article as-is you will require the following:

  • An Amazon Redshift data warehouse. This can be either provisioned or Redshift Serverless.
  • A default IAM role attached to the Redshift cluster that can access the file located at s3://awssampledbuswest2/tickit/listings_pipe.txt
  • A Redshift database user with the following permissions:
    • Create schema
    • Create table
    • Usage on language plpgsql

Stored procedures

Stored procedures are commonly used to encapsulate logic for data transformation, data validation, and business-specific logic. By combining multiple SQL steps into a stored procedure, you can reduce round trips between your applications and the database.

Here, you will use a stored procedure to dynamically create a series of SQL statements and then execute them on your Redshift data warehouse.

Solution overview and implementation

The stored procedure has the following mandatory inputs:

  • source_table_name: the name of the table to be used as the source for applying the changes. This table must have the same column names as the target table.
  • source_schema_name: the name of the schema the source table is stored in.
  • target_table_name: the name of the table to be used as the target for applying changes. The target table must have the following attributes:
    • A primary key defined. This can be one or more columns.
    • An audit column called effective_from_timestamp
    • An audit column called effective_to_timestamp
    • An audit column called active_indicator
  • target_schema_name: the name of the schema the target table is stored in.

Using the above inputs, the stored procedure will perform the following:

  1. Declare a number of variables to be used throughout the stored procedure.
  2. Use Redshift system tables to identify a list of primary keys.
  3. Use the list from step 2 to create:
    1. pk_compare_string: a primary key compare string. This is used as a join condition in steps 7. and 8.
    2. src_pk_cols_null: a condition that checks if the primary key column in the source table is null post-join with the target table, indicating this record is a deleted record (i.e. it no longer exists in the source). This is used to determine the record change type in step 7.
    3. tgt_pk_cols_null: a condition that checks if the primary key column in the target table is null post-join with the source table, indicating this record is a new insert (i.e. it doesn’t exist in the target yet). This is used to determine the record change type in step 7.
  4. Use the pg_get_cols function to get a list of columns for the target table, excluding the audit columns
  5. Use the list of columns from step 4 to create:
    1. update_compare_string: a string that contains a list of OR conditions. These conditions are checking if the source column’s value is different to the target column’s value for any record defined by its primary key. This is used to determine the record change type in step 7.
    2. column_select_string: a string that contains logic to write either the source or target values into the temporary table created in step 7.
    3. column_list_string: a list of comma separated column names. This is used in steps 7 and 9.
    4. src_column_select_string: a list of column names prefixed with src_ to identify the source values. This is used in step 7.
    5. tgt_column_select_string: a list of column names prefixed with tgt_ to identify the target values. This is used in step 7.
  6. Drop a temporary table if it already exists. This temporary table might exist from previous executions of the stored procedure.
  7. Create a temporary table to store the changed records, with a column that identifies if the record is a new insert, an updated record, or a deleted record.
  8. Close any records that have been updated or deleted in the source table by updating the active_indicator and effective_to_timestamp columns in the target table.
  9. Insert any new records with appropriate audit column values.

To deploy the following stored procedure, copy the entire code into your SQL Editor connected to Amazon Redshift and run.

CREATE OR REPLACE PROCEDURE p_detect_changes(
        source_table_name varchar, 
        source_schema_name varchar, 
        target_table_name varchar, 
        target_schema_name varchar
        )
AS 
$$
DECLARE
        column_list record;
        pk_column_list record;
        pk_column_string varchar(MAX) := '';
        pk_compare_string varchar(MAX) := '';
        update_compare_string varchar(MAX) := '';
        column_select_string varchar(MAX) := '';
        src_column_select_string varchar(MAX) := '';
        tgt_column_select_string varchar(MAX) := '';
        src_pk_cols_null varchar(MAX) := '';
        tgt_pk_cols_null varchar(MAX) := '';
        column_list_string varchar(MAX) := '';
        runtime timestamp;
        num_updates integer := 0;
        num_inserts integer := 0;
        num_deletes integer := 0;
    
        /* Variables used for decisions and testing */
        var1 smallint := 0;

BEGIN
        
    /* Initialise decision variable */
    var1 = 0;
    
    /* Create a string containing primary key columns of the target table by looping through the information schema tables containing this information, and building up a string */
    FOR pk_column_list IN 
                EXECUTE 
                'SELECT     kcu.column_name '
                || 'FROM    information_schema.table_constraints tco '
                || 'JOIN    information_schema.key_column_usage kcu '
                || 'ON              kcu.constraint_name = tco.constraint_name '
                || 'AND     kcu.constraint_schema = tco.constraint_schema '
                || 'AND     kcu.constraint_name = tco.constraint_name '
                || 'WHERE   tco.constraint_type = ''PRIMARY KEY'' '
                || 'AND     tco.table_name = ' || quote_literal(target_table_name)
                || 'AND     tco.table_schema = ' || quote_literal(target_schema_name)
                || 'AND     kcu.column_name NOT IN (''effective_from_timestamp'',''active_indicator'',''effective_to_timestamp'')'
    LOOP 
                /* Create generic pk string list */
                pk_column_string = pk_column_string + pk_column_list.column_name + ',';
                
                /* Choose how to create pk_compare_string (to be used in joins later on) based on if this is the first pk column */
                IF var1 = 0
                THEN    
                        pk_compare_string = pk_compare_string + ' src.' + pk_column_list.column_name + ' = tgt.' + pk_column_list.column_name;
                        src_pk_cols_null = 'src.' + pk_column_list.column_name + ' IS NULL ';
                        tgt_pk_cols_null = 'tgt.' + pk_column_list.column_name + ' IS NULL ';
                ELSE    
                        pk_compare_string = pk_compare_string + ' AND src.' + pk_column_list.column_name + ' = tgt.' + pk_column_list.column_name;
                END IF;

                /* Signal end of first pk column */
                var1 = 1;

    END LOOP;
    
    /* Initialise decision variable */
    var1 = 0;

    /* Create a string containing columns of the target table by looping through the response from pg_get_cols and building up a string */
    FOR column_list IN 
                EXECUTE
                'SELECT     col_name::varchar '
                || 'FROM    pg_get_cols(''' || target_schema_name || '.' || target_table_name || ''') '
                || '        cols(table_schema name, table_name name, col_name name, col_type varchar, col_num int) '
                || 'WHERE   col_name NOT IN (''effective_from_timestamp'',''active_indicator'',''effective_to_timestamp'')'
    LOOP
                IF var1 = 0
                THEN 
                        update_compare_string = update_compare_string + ' src.' + column_list.col_name + ' <> tgt.' + column_list.col_name;
                        column_select_string = column_select_string + ' CASE WHEN oper_type = ''D'' THEN tgt_' + column_list.col_name + ' ELSE src_' + column_list.col_name + ' END AS ' || column_list.col_name; 
                        column_list_string = column_list.col_name; 
                        src_column_select_string = src_column_select_string + ' src.' + column_list.col_name + ' AS src_' + column_list.col_name;
                        tgt_column_select_string = tgt_column_select_string + ' tgt.' + column_list.col_name + ' AS tgt_' + column_list.col_name;
                ELSE 
                        update_compare_string = update_compare_string + ' OR src.' + column_list.col_name + ' <> tgt.' + column_list.col_name;
                        column_select_string = column_select_string + ', CASE WHEN oper_type = ''D'' THEN tgt_' + column_list.col_name + ' ELSE src_' + column_list.col_name + ' END AS ' || column_list.col_name;
                        column_list_string = column_list_string + ', ' + column_list.col_name;
                        src_column_select_string = src_column_select_string + ', src.' + column_list.col_name + ' AS src_' + column_list.col_name;
                        tgt_column_select_string = tgt_column_select_string + ', tgt.' + column_list.col_name + ' AS tgt_' + column_list.col_name;
                END IF;

                var1 = 1;

    END LOOP;

    /* 
        Assumes that the source table is fully refreshed with a new version of the source data every run
        
        Create a temporary table with the required insert (I)/update (U)/delete (D) flags for each row to be changed     
    */
    EXECUTE 
    'DROP TABLE IF EXISTS temp_' || target_table_name;

    EXECUTE
    'CREATE TEMPORARY TABLE temp_' || target_table_name || ' as '
    || '(SELECT ' || column_select_string || ', oper_type '
    || 'FROM    ( '
    || '        SELECT  ' || src_column_select_string || ',' || tgt_column_select_string || ','
                    /* 
                        If the target PK column is null, then it must not exist in the final table yet. Therefore is an insert.
                        If the source PK column is null, then it must not exist in the source table. Therefore it has been deleted from source.
                        If the update compare string is different (i.e. one of the values in the non-PK columns is different between source and target), it is an update.
                        Otherwise, the record is the same between source and target and there is no update.
                    */
    || '                CASE WHEN ' || tgt_pk_cols_null || ' THEN ''I''' 
    || '                        WHEN ' || src_pk_cols_null || ' THEN ''D'''
    || '                        WHEN (' || update_compare_string || ') THEN ''U'''
    || '                        ELSE ''X'''
    || '                END AS oper_type'
    || '        FROM    ' || source_schema_name || '.' || source_table_name || ' src'
    || '        FULL OUTER JOIN (select * from ' || target_schema_name || '.' || target_table_name || ' where active_indicator) tgt'
    || '        ON      ' || pk_compare_string
    || '        )'
    || 'WHERE   oper_type <> ''X'')';

    /* Raise INFO about updates being made */
    EXECUTE 'SELECT nvl(count(*),0) from temp_' || target_table_name ||' where oper_type = ''U''' INTO num_updates;
    EXECUTE 'SELECT nvl(count(*),0) from temp_' || target_table_name ||' where oper_type = ''D''' INTO num_deletes;
    EXECUTE 'SELECT nvl(count(*),0) from temp_' || target_table_name ||' where oper_type = ''I''' INTO num_inserts;

    RAISE INFO 'Number of updates: %', num_updates;
    RAISE INFO 'Number of deletes: %', num_deletes;
    RAISE INFO 'Number of inserts: %', num_inserts;

    /* Execute the updates and deletes, and then execute the inserts */
    SELECT INTO runtime timeofday();

    EXECUTE
    'UPDATE ' || target_schema_name || '.' || target_table_name || ' tgt'
    || ' SET active_indicator = false, effective_to_timestamp = dateadd(microsecond, -1, ' || quote_literal(runtime) || ')'
    || ' FROM temp_' || source_table_name || ' src'
    || ' WHERE ' || pk_compare_string
    || ' AND src.oper_type IN (''D'', ''U'')'
    || ' AND tgt.active_indicator';

    EXECUTE
    'INSERT INTO ' || target_schema_name || '.' || target_table_name
    || ' SELECT ' || column_list_string || ', ' || quote_literal(runtime) || ' as effective_from_timestamp, ''9999/12/31 23:59:59.999999''::timestamp as effective_to_timestamp, true as active_indicator'
    || ' FROM temp_' || source_table_name || ' src'
    || ' WHERE src.oper_type IN (''I'', ''U'')';

END;
$$ LANGUAGE plpgsql;

Validation steps

After creating the stored procedure you can run the below SQL statements to perform the following:

  1. Create a source and target schema
  2. Create a source and target table
  3. Load data from the sample tickit dataset to the source table
  4. Execute the stored procedure
  5. Run some validations on the target table
  6. Update, delete, and insert records in the source table
  7. Execute the stored procedure
  8. Run final validations on the target table
create schema stage;
create schema modelled;

CREATE TABLE stage.tickit_listing (
    listid integer NOT NULL ENCODE az64 distkey,
    sellerid integer NOT NULL ENCODE az64,
    eventid integer NOT NULL ENCODE az64,
    dateid smallint NOT NULL ENCODE raw,
    numtickets smallint NOT NULL ENCODE az64,
    priceperticket numeric(8, 2) ENCODE az64,
    totalprice numeric(8, 2) ENCODE az64,
    listtime timestamp without time zone ENCODE az64,
    PRIMARY KEY (listid)
) DISTSTYLE KEY
SORTKEY (dateid);

CREATE TABLE modelled.tickit_listing (
    listid integer NOT NULL ENCODE az64 distkey,
    sellerid integer NOT NULL ENCODE az64,
    eventid integer NOT NULL ENCODE az64,
    dateid smallint NOT NULL ENCODE raw,
    numtickets smallint NOT NULL ENCODE az64,
    priceperticket numeric(8, 2) ENCODE az64,
    totalprice numeric(8, 2) ENCODE az64,
    listtime timestamp without time zone ENCODE az64,
    effective_from_timestamp timestamp without time zone NOT NULL ENCODE az64,
    effective_to_timestamp timestamp without time zone NOT NULL ENCODE az64,
    active_indicator boolean NOT NULL ENCODE raw,
    PRIMARY KEY (listid, effective_from_timestamp)
) DISTSTYLE KEY
SORTKEY (active_indicator, dateid);

COPY stage.tickit_listing
FROM 's3://awssampledbuswest2/tickit/listings_pipe.txt'
IAM_ROLE default
region 'us-west-2';
/*
Expected output:
    Load into table 'tickit_listing' completed, 192497 record(s) loaded successfully.
*/

call p_detect_changes('tickit_listing', 'stage', 'tickit_listing', 'modelled');
/* 
Expected output: 
    Info:
    Number of updates: 0
    Number of deletes: 0
    Number of inserts: 192497
*/

select * from modelled.tickit_listing;
select count(*) from modelled.tickit_listing; -- 192497
select count(*), active_indicator from modelled.tickit_listing group by active_indicator; -- 192497    true

update stage.tickit_listing set numtickets = numtickets + 5 where listid < 1000; -- Affected rows: 922
delete from stage.tickit_listing where listid > 234000; -- Affected rows: 1377
insert into stage.tickit_listing (select listid + 2000 as listid, sellerid, eventid, dateid, numtickets, priceperticket, totalprice, listtime from stage.tickit_listing where listid > 233000); -- Affected rows: 900

call p_detect_changes('tickit_listing', 'stage', 'tickit_listing', 'modelled');
/*
Expected output: 
    Info:
    Number of updates: 1327
    Number of deletes: 972
    Number of inserts: 495
*/

select count(*) from modelled.tickit_listing; -- 194319
select count(*), active_indicator from modelled.tickit_listing group by active_indicator;
/* 
Expected output:
    count    active_indicator
    192020    true    
    2299    false    
*/

Conclusion

In this article we have created a custom, generic stored procedure that will maintain your data warehouse SCD2 structures with full CDC between a source and target object. It is extendable and can be modified to comply with your data modelling conventions.

This stored procedure can be used in your regular workload to maintain SCD2 objects.

Please leave a comment below if you are interested in seeing any further features added this stored procedure.

Clean-up

Run the following SQL to remove all created objects:

DROP SCHEMA stage CASCADE;
DROP SCHEMA modelled CASCADE;
DROP PROCEDURE p_detect_changes(
        source_table_name varchar, 
        source_schema_name varchar, 
        target_table_name varchar, 
        target_schema_name varchar);

Authors

Sean Beath - Specialist Redshift Solutions Architect

Rick Fraser - Specialist Data Solutions Architect