Unable to remove duplicates when writing to Redshift with pre-actions and post-actions using Glue job

0

I have a glue pyspark script that processes DDB data exported to S3 and writes it to Redshift. Initially, it was using below logic:

redshiftConnectionOptions = {
"postactions": "BEGIN; MERGE INTO db.tests USING db.tests_temp_igervy ON tests.code = tests_temp_igervy.code AND tests.class = tests_temp_igervy.class WHEN MATCHED THEN UPDATE SET code = tests_temp_igervy.code, class = tests_temp_igervy.class, updatedAtTimestamp = tests_temp_igervy.updatedAtTimestamp WHEN NOT MATCHED THEN INSERT VALUES (tests_temp_igervy.code, tests_temp_igervy.class, tests_temp_igervy.updatedAtTimestamp); DROP TABLE db.tests_temp_igervy; END;"
    ,
    "redshiftTmpDir": "s3://<bucket>/temporary/testing", 
    "useConnectionProperties": "true",
    "connectionName": "Redshift connection",
    "dbtable": "db.tests",
    "preactions": "CREATE TABLE IF NOT EXISTS db.tests (code VARCHAR, class VARCHAR, updatedAtTimestamp TIMESTAMP); DROP TABLE IF EXISTS db.tests_temp_igervy; CREATE TABLE db.tests_temp_igervy (code VARCHAR, student VARCHAR, updatedAtTimestamp TIMESTAMP);"
    ,
    "url": "<url>",
    "user": redshiftUser,
    "password": redshiftPassword,
    "aws_iam_role": "<iam arn>"
}

This successfully loads data into Redshift and I get below:

code	              class	          updatedATTimestamp	
SGHF864S	QDSD3423	2023-11-17 15:56:25.025	
SGHF864S	FGHS7683	2023-11-17 15:56:25.025	

However, upon rerunning the same job, it introduces duplicates (even if my Redshift table has (code,class) as PK). I am aware that Redshift doesn't strictly enforce the uniqueness for PK. To handle this, I tried row_number() in postactions part:

"""BEGIN; MERGE INTO db.tests USING db.tests_temp_igervy ON tests.code = tests_temp_igervy.code AND tests.class = tests_temp_igervy.class WHEN MATCHED THEN UPDATE SET code = tests_temp_igervy.code, class 
= tests_temp_igervy.class, updatedAtTimestamp = tests_temp_igervy.updatedAtTimestamp WHEN NOT MATCHED THEN INSERT VALUES (tests_temp_igervy.code, tests_temp_igervy.class, tests_temp_igervy.updatedAtTimestamp); DROP TABLE db.tests_temp_igervy; 
WITH cte AS (
            SELECT *,
            ROW_NUMBER() OVER (PARTITION BY code, class ORDER BY (select NULL)) as rn
            FROM db.tests
        )
        DELETE FROM db.tests
        WHERE (code, class) IN (
            SELECT code, class FROM cte WHERE rn > 1
        );
        DROP TABLE db.tests_temp_igervy;
        END;

For this, I am receiving an error saying 'IN' is not supported. So I tried using JOIN instead of IN with same logic, but it still introduce duplicates on rerun.

Finally, I came across 'REMOVE DUPLICATES' so I implemented it as below and it still didn't remove duplicates upon rerun.

"postactions": "BEGIN; MERGE INTO db.tests USING db.tests_temp_igervy ON (tests.code = tests_temp_igervy.code AND tests.class = tests_temp_igervy.class) REMOVE DUPLICATES;"

Here's how I have my Redshift table defined, if it's relevant info:

Show Table DDL statement
"CREATE TABLE db.tests(
    code character varying(256) NOT NULL ENCODE lzo distkey,
    class character varying(256) NOT NULL ENCODE lzo,
    updatedattimestamp timestamp without time zone ENCODE az64,
    PRIMARY KEY (code, class)
)
DISTSTYLE AUTO;"

Can anyone share some pointers on how to handle this? TIA!

已提問 4 個月前檢視次數 242 次
1 個回答
0

Why don’t u try using spectrum to read the data instead of loading it into redshift? That’s what I’m doing in place of etl from rds Postgres snapshot exports.

You can’t do a multi column in statement, there’s actually no way to delete just one row of a duplicate in redshift. You’d have to add a physical row number column to the table, delete anything >1, then drop the column.

When u try merge remove duplicates, does the target or source table already have dupes in it?

已回答 3 個月前

您尚未登入。 登入 去張貼答案。

一個好的回答可以清楚地回答問題並提供建設性的意見回饋,同時有助於提問者的專業成長。

回答問題指南