By using AWS re:Post, you agree to the AWS re:Post Terms of Use

Complex Materialized View creation using Redshift Nested Materialized Views

8 minute read
Content level: Expert
0

We will demonstrate how to use Nested Materialized views in Redshift in situations where the base Materialized View SQL is complex and contains many large subqueries, making it difficult to create

This post will explore an example of how customers can use Redshift to sift through massive amounts of data to gain valuable insights. The calculations performed by the example SQL are quite complex and require scanning tens of terabytes of data. Performing these calculations every time in a SQL statement will take a long time, and would not be practical for the team’s use. However, putting this SQL under a materialized view can allow the customer team to quickly and easily pull these results, since the materialized view performs most of the scanning, aggregations, and joining of the data up front:

SELECT A.user_id, 
       A.adate,
       A.vscore_4, 
       A.i_12_mths, 
       A.user_age_archive, 
       A.user_iscore,
       Aa.month_since_last_mtg_open, 
       Ab.zip_code,       
       Ac.state
FROM
(
    SELECT distinct user_id, adate,
        median(vscore_4) OVER (PARTITION BY user_id, adate) as vscore_4,
        median(i_12_mths) OVER (PARTITION BY user_id, adate) as i_12_mths,   
        median(user_age_archive) OVER (PARTITION BY user_id, adate) as user_age_archive, 
        median(user_iscore) OVER (PARTITION BY user_id, adate) as user_iscore
    FROM f
    WHERE stability < 100000000 and high_standing < 100000000
) as A
LEFT JOIN
(
    SELECT user_id, adate,
    12*(date_part(year, adate)-trunc(max(start_date)/100)) + date_part(month, adate)-(max(start_date)-trunc(max(start_date)/100)) as month_since_last_mtg_open
    FROM fact_all
    WHERE classification = 'FM' and stability < 100000000 and high_standing < 100000000
    GROUP BY user_id, adate
    ORDER BY user_id, adate
) as Aa
ON A.user_id = Aa.user_id and A.adate = Aa.adate
LEFT JOIN
(
    SELECT distinct user_id, adate,
        median(cast(zip_code as int)) OVER (PARTITION BY user_id, adate) as zip_code       
    FROM fact_all
    WHERE zip_code!='' and stability < 100000000 and high_standing < 100000000
) as Ab
ON A.user_id = Ab.user_id and A.adate = Ab.adate
LEFT JOIN
(
    WITH df_temp AS
    (SELECT user_id, adate, state, 
            ROW_NUMBER() OVER (PARTITION BY user_id, adate ORDER BY count(state) DESC) AS rn
        FROM 
           (
            select user_id, adate, state 
            from fact_all
            )
        group by user_id, adate, state
    )
    SELECT user_id, adate, state FROM df_temp
    WHERE rn = 1
) as Ac
ON A.user_id = Ac.user_id and A.adate = Ac.adate

We can see this SQL consists of 4 subqueries, each of which is on the same base table fact_all, which is 34TB in size. The result sets from each of the subqueries is also quite large. Then each of these results sets are joined, to come up with the combined results. When this SQL was put into a materialized view, it failed to create, even on a very large cluster it failed after 24 hours. Why was this the case? Let us look further.

As we can now see from a snippet of the QUERY PLAN of the query below, the joining of the subquery results is where the problem arises. These joins end up being multiple broadcasts, which is a very expensive operation that requires the values of a result set to be copied to all nodes in the cluster. See the AWS Redshift documentation for a detailed explanation of data redistribution, and evaluating query plans.

XN Hash Left Join DS_BCAST_INNER (cost=6346975720280.07..45136252742305562230784.00 rows=563980926304913522688 width=252)
  Hash Cond: (("outer".adate = "inner".adate) AND ("outer".user_id = "inner".user_id))
  ->  XN Hash Left Join DS_BCAST_INNER  (cost=5334882864421.78..12135809249447387136.00 rows=151659946706240544 width=186)
        Hash Cond: (("outer".adate = "inner".adate) AND ("outer".user_id = "inner".user_id))
        ->  XN Hash Left Join DS_BCAST_INNER  (cost=4265524403175.25..260676123715712.22 rows=2946900245290 width=154)
              Hash Cond: (("outer".adate = "inner".adate) AND ("outer".user_id = "inner".user_id))
              ->  XN Subquery Scan a  (cost=4260211338128.82..4275848531933.52 rows=2058623683 width=146)
                    ->  XN Unique  (cost=4260211338128.82..4275827945696.69 rows=2058623683 width=84)

Broadcast joins occur when the data is not collocated. This problem is usually addressed by configuring proper distribution keys on the underlying tables. However, in this case what is being joined are the result sets from subqueries, not the tables themselves. Each subquery entails operations subquery scan, on top of window and distinct functions. Distribution keys will not persist in this layer, causing the broadcasts to occur. This becomes a significant hurdle in creating this materialized view – it timed out after running for 24 hours in a provisioned cluster. Even on a serverless endpoint with maximum RPUs, it timed out after 6 hours. The subqueries themselves did not take too long to run, but it’s the joining which is causing the major bottleneck (since D_BCAST_INNER copies the entire inner table to all the compute nodes).

Failure due to broadcast

Solution Summary

To solve this problem, we used Nested Materialized views to apply distribution keys to the temporary data sets. Take a look at this alternate approach to create the same materialized view as shown above, but now using 2 separate steps:

  1. Create Materialized Views for each subquery

  2. Create the Nested MV (which joins the other MVs). (see appendix for full code)

Nested MV diagram

Each subquery now becomes a materialized view. The key advantage of this approach is now we can persist the distribution key of the subquery result sets – right in the materialized view definition itself. Just like tables, Redshift Materialized views can have both sort keys and distribution keys defined as attributes. As you can see, once the subquery materialized views are created with the distribution key USER_ID, then we create the nested materialized view to join them together. This join now ends up being extremely fast operation, as no redistribution process is required, since that the data for the joins are collocated. The initial view creation failed after 24 hours, but this nested MV approach took less than 2 hours! For a deeper discussion of Data redistribution, refer to this Developer Guide

As an additional benefit, this method allows us to apply a parallel approach. Since the step #1 entails multiple materialized views created independent of each other, therefore these can all be created in parallel using multiple threads. If our cluster has enough compute capacity to handle the processing, all MVs as part of Step #1 can be created simultaneously. This allows us to save even more time as we are now only bound by the longest subquery, plus the final nested MV creation. See the timings of the different subquery MVs below.

Operation NameDuration
Consumer level Subquery1 MV1 hr 41 min
Consumer level Subquery2 MV3.5 min
Consumer level Subquery3 MV1 hr 23 min
Consumer level Subquery4 MV36 min
Consumer level Parent combine8 min

The longest MVs were #1 and #3. All 4 subquery MVs were created in parallel, then the parent combined MV took only 8 minutes. So the entire create (or refresh) operation is 1 hour and 50 minutes. This allowed this customer analytics team to harness the large volumes of data they have and offer it to their end users in a user-friendly, easy to consume manner.

Conclusion

Amazon Redshift’s scalability, performance, and rich capabilities such as nested materialized views allows you to easily and quickly add value to your business by delivering valuable insights with extremely fast performance

Appendix

Subquery Materialized Views:

create materialized view subquery1 distkey(user_id_adate) sortkey(user_id_adate) as
SELECT distinct user_id, adate,
        median(vscore_4) OVER (PARTITION BY user_id, adate) as vscore_4,
        median(i_12_mths) OVER (PARTITION BY user_id, adate) as i_12_mths,   
        median(user_age_archive) OVER (PARTITION BY user_id, adate) as user_age_archive, 
        median(user_iscore) OVER (PARTITION BY user_id, adate) as user_iscore
    FROM fact_all
    WHERE stability < 100000000 and high_standing < 100000000
;

create materialized view subquery2 distkey(user_id_adate) sortkey(user_id_adate) as
    SELECT user_id, adate, user_id || '_' || adate as user_id_adate,
    12*(date_part(year, adate)-trunc(max(origination_date_open)/100)) + date_part(month, adate)-(max(origination_date_open)-trunc(max(origination_date_open)/100)) as month_since_last_mtg_open
    FROM fact_all
    WHERE classification = 'FM' and stability < 100000000 and high_standing < 100000000
    GROUP BY user_id, adate


create materialized view subquery3 distkey(user_id_adate) sortkey(user_id_adate) as
    SELECT distinct user_id, adate, user_id || '_' || adate as user_id_adate,
        median(cast(zip_code as int)) OVER (PARTITION BY user_id, adate) as zip_code       
    FROM fact_all
    WHERE zip_code!='' and stability < 100000000 and high_standing < 100000000

create materialized view subquery4 distkey(user_id_adate) sortkey(user_id_adate) as
WITH df_temp AS
    (SELECT user_id, adate, user_id || '_' || adate as user_id_adate, state, 
            ROW_NUMBER() OVER (PARTITION BY user_id, adate ORDER BY count(state) DESC) AS rn
        FROM 
           (
            select user_id, adate, state 
            from fact_all
            )
        group by user_id, adate, state
    )
    SELECT user_id, adate, user_id_adate, state FROM df_temp
    WHERE rn = 1

Parent Materialized View:

Create materialized view fact_all_mv distkey(user_id_adate) sortkey(user_id_adate) as 
SELECT subquery1.user_id, 
       subquery1.adate,
       subquery1.user_id_adate,
       subquery1.vscore_4, 
       subquery1.i_12_mths, 
       subquery1.user_age_archive, 
       subquery1.user_iscore,
       subquery2.month_since_last_mtg_open, 
       subquery3.zip_code,       
       subquery4.state
From subquery1
LEFT JOIN subquery2 ON subquery1.user_id_adate=subquery2.user_id_adate
LEFT JOIN subquery3 ON subquery1.user_id_adate=subquery3.user_id_adate
LEFT JOIN subquery4 ON subquery1.user_id_adate=subquery4.user_id_adate

Article Co-Authors:

profile pictureAWS
EXPERT
published 6 months ago1.7K views