Complex Materialized View creation using Redshift Nested Materialized Views
We will demonstrate how to use Nested Materialized views in Redshift in situations where the base Materialized View SQL is complex and contains many large subqueries, making it difficult to create
This post will explore an example of how customers can use Redshift to sift through massive amounts of data to gain valuable insights. The calculations performed by the example SQL are quite complex and require scanning tens of terabytes of data. Performing these calculations every time in a SQL statement will take a long time, and would not be practical for the team’s use. However, putting this SQL under a materialized view can allow the customer team to quickly and easily pull these results, since the materialized view performs most of the scanning, aggregations, and joining of the data up front:
SELECT A.user_id,
A.adate,
A.vscore_4,
A.i_12_mths,
A.user_age_archive,
A.user_iscore,
Aa.month_since_last_mtg_open,
Ab.zip_code,
Ac.state
FROM
(
SELECT distinct user_id, adate,
median(vscore_4) OVER (PARTITION BY user_id, adate) as vscore_4,
median(i_12_mths) OVER (PARTITION BY user_id, adate) as i_12_mths,
median(user_age_archive) OVER (PARTITION BY user_id, adate) as user_age_archive,
median(user_iscore) OVER (PARTITION BY user_id, adate) as user_iscore
FROM f
WHERE stability < 100000000 and high_standing < 100000000
) as A
LEFT JOIN
(
SELECT user_id, adate,
12*(date_part(year, adate)-trunc(max(start_date)/100)) + date_part(month, adate)-(max(start_date)-trunc(max(start_date)/100)) as month_since_last_mtg_open
FROM fact_all
WHERE classification = 'FM' and stability < 100000000 and high_standing < 100000000
GROUP BY user_id, adate
ORDER BY user_id, adate
) as Aa
ON A.user_id = Aa.user_id and A.adate = Aa.adate
LEFT JOIN
(
SELECT distinct user_id, adate,
median(cast(zip_code as int)) OVER (PARTITION BY user_id, adate) as zip_code
FROM fact_all
WHERE zip_code!='' and stability < 100000000 and high_standing < 100000000
) as Ab
ON A.user_id = Ab.user_id and A.adate = Ab.adate
LEFT JOIN
(
WITH df_temp AS
(SELECT user_id, adate, state,
ROW_NUMBER() OVER (PARTITION BY user_id, adate ORDER BY count(state) DESC) AS rn
FROM
(
select user_id, adate, state
from fact_all
)
group by user_id, adate, state
)
SELECT user_id, adate, state FROM df_temp
WHERE rn = 1
) as Ac
ON A.user_id = Ac.user_id and A.adate = Ac.adate
We can see this SQL consists of 4 subqueries, each of which is on the same base table fact_all, which is 34TB in size. The result sets from each of the subqueries is also quite large. Then each of these results sets are joined, to come up with the combined results. When this SQL was put into a materialized view, it failed to create, even on a very large cluster it failed after 24 hours. Why was this the case? Let us look further.
As we can now see from a snippet of the QUERY PLAN of the query below, the joining of the subquery results is where the problem arises. These joins end up being multiple broadcasts, which is a very expensive operation that requires the values of a result set to be copied to all nodes in the cluster. See the AWS Redshift documentation for a detailed explanation of data redistribution, and evaluating query plans.
XN Hash Left Join DS_BCAST_INNER (cost=6346975720280.07..45136252742305562230784.00 rows=563980926304913522688 width=252)
Hash Cond: (("outer".adate = "inner".adate) AND ("outer".user_id = "inner".user_id))
-> XN Hash Left Join DS_BCAST_INNER (cost=5334882864421.78..12135809249447387136.00 rows=151659946706240544 width=186)
Hash Cond: (("outer".adate = "inner".adate) AND ("outer".user_id = "inner".user_id))
-> XN Hash Left Join DS_BCAST_INNER (cost=4265524403175.25..260676123715712.22 rows=2946900245290 width=154)
Hash Cond: (("outer".adate = "inner".adate) AND ("outer".user_id = "inner".user_id))
-> XN Subquery Scan a (cost=4260211338128.82..4275848531933.52 rows=2058623683 width=146)
-> XN Unique (cost=4260211338128.82..4275827945696.69 rows=2058623683 width=84)
Broadcast joins occur when the data is not collocated. This problem is usually addressed by configuring proper distribution keys on the underlying tables. However, in this case what is being joined are the result sets from subqueries, not the tables themselves. Each subquery entails operations subquery scan, on top of window and distinct functions. Distribution keys will not persist in this layer, causing the broadcasts to occur. This becomes a significant hurdle in creating this materialized view – it timed out after running for 24 hours in a provisioned cluster. Even on a serverless endpoint with maximum RPUs, it timed out after 6 hours. The subqueries themselves did not take too long to run, but it’s the joining which is causing the major bottleneck (since D_BCAST_INNER copies the entire inner table to all the compute nodes).
Solution Summary
To solve this problem, we used Nested Materialized views to apply distribution keys to the temporary data sets. Take a look at this alternate approach to create the same materialized view as shown above, but now using 2 separate steps:
-
Create Materialized Views for each subquery
-
Create the Nested MV (which joins the other MVs). (see appendix for full code)
Each subquery now becomes a materialized view. The key advantage of this approach is now we can persist the distribution key of the subquery result sets – right in the materialized view definition itself. Just like tables, Redshift Materialized views can have both sort keys and distribution keys defined as attributes. As you can see, once the subquery materialized views are created with the distribution key USER_ID, then we create the nested materialized view to join them together. This join now ends up being extremely fast operation, as no redistribution process is required, since that the data for the joins are collocated. The initial view creation failed after 24 hours, but this nested MV approach took less than 2 hours! For a deeper discussion of Data redistribution, refer to this Developer Guide
As an additional benefit, this method allows us to apply a parallel approach. Since the step #1 entails multiple materialized views created independent of each other, therefore these can all be created in parallel using multiple threads. If our cluster has enough compute capacity to handle the processing, all MVs as part of Step #1 can be created simultaneously. This allows us to save even more time as we are now only bound by the longest subquery, plus the final nested MV creation. See the timings of the different subquery MVs below.
Operation Name | Duration |
---|---|
Consumer level Subquery1 MV | 1 hr 41 min |
Consumer level Subquery2 MV | 3.5 min |
Consumer level Subquery3 MV | 1 hr 23 min |
Consumer level Subquery4 MV | 36 min |
Consumer level Parent combine | 8 min |
The longest MVs were #1 and #3. All 4 subquery MVs were created in parallel, then the parent combined MV took only 8 minutes. So the entire create (or refresh) operation is 1 hour and 50 minutes. This allowed this customer analytics team to harness the large volumes of data they have and offer it to their end users in a user-friendly, easy to consume manner.
Conclusion
Amazon Redshift’s scalability, performance, and rich capabilities such as nested materialized views allows you to easily and quickly add value to your business by delivering valuable insights with extremely fast performance
Appendix
Subquery Materialized Views:
create materialized view subquery1 distkey(user_id_adate) sortkey(user_id_adate) as
SELECT distinct user_id, adate,
median(vscore_4) OVER (PARTITION BY user_id, adate) as vscore_4,
median(i_12_mths) OVER (PARTITION BY user_id, adate) as i_12_mths,
median(user_age_archive) OVER (PARTITION BY user_id, adate) as user_age_archive,
median(user_iscore) OVER (PARTITION BY user_id, adate) as user_iscore
FROM fact_all
WHERE stability < 100000000 and high_standing < 100000000
;
create materialized view subquery2 distkey(user_id_adate) sortkey(user_id_adate) as
SELECT user_id, adate, user_id || '_' || adate as user_id_adate,
12*(date_part(year, adate)-trunc(max(origination_date_open)/100)) + date_part(month, adate)-(max(origination_date_open)-trunc(max(origination_date_open)/100)) as month_since_last_mtg_open
FROM fact_all
WHERE classification = 'FM' and stability < 100000000 and high_standing < 100000000
GROUP BY user_id, adate
create materialized view subquery3 distkey(user_id_adate) sortkey(user_id_adate) as
SELECT distinct user_id, adate, user_id || '_' || adate as user_id_adate,
median(cast(zip_code as int)) OVER (PARTITION BY user_id, adate) as zip_code
FROM fact_all
WHERE zip_code!='' and stability < 100000000 and high_standing < 100000000
create materialized view subquery4 distkey(user_id_adate) sortkey(user_id_adate) as
WITH df_temp AS
(SELECT user_id, adate, user_id || '_' || adate as user_id_adate, state,
ROW_NUMBER() OVER (PARTITION BY user_id, adate ORDER BY count(state) DESC) AS rn
FROM
(
select user_id, adate, state
from fact_all
)
group by user_id, adate, state
)
SELECT user_id, adate, user_id_adate, state FROM df_temp
WHERE rn = 1
Parent Materialized View:
Create materialized view fact_all_mv distkey(user_id_adate) sortkey(user_id_adate) as
SELECT subquery1.user_id,
subquery1.adate,
subquery1.user_id_adate,
subquery1.vscore_4,
subquery1.i_12_mths,
subquery1.user_age_archive,
subquery1.user_iscore,
subquery2.month_since_last_mtg_open,
subquery3.zip_code,
subquery4.state
From subquery1
LEFT JOIN subquery2 ON subquery1.user_id_adate=subquery2.user_id_adate
LEFT JOIN subquery3 ON subquery1.user_id_adate=subquery3.user_id_adate
LEFT JOIN subquery4 ON subquery1.user_id_adate=subquery4.user_id_adate
Article Co-Authors:
Relevant content
- asked 2 years agolg...
- asked 3 months agolg...
- AWS OFFICIALUpdated 2 years ago
- AWS OFFICIALUpdated 2 years ago