I have a glue pyspark script that processes DDB data exported to S3 and writes it to Redshift. Initially, it was using below logic:
redshiftConnectionOptions = {
"postactions": "BEGIN; MERGE INTO db.tests USING db.tests_temp_igervy ON tests.code = tests_temp_igervy.code AND tests.class = tests_temp_igervy.class WHEN MATCHED THEN UPDATE SET code = tests_temp_igervy.code, class = tests_temp_igervy.class, updatedAtTimestamp = tests_temp_igervy.updatedAtTimestamp WHEN NOT MATCHED THEN INSERT VALUES (tests_temp_igervy.code, tests_temp_igervy.class, tests_temp_igervy.updatedAtTimestamp); DROP TABLE db.tests_temp_igervy; END;"
,
"redshiftTmpDir": "s3://<bucket>/temporary/testing",
"useConnectionProperties": "true",
"connectionName": "Redshift connection",
"dbtable": "db.tests",
"preactions": "CREATE TABLE IF NOT EXISTS db.tests (code VARCHAR, class VARCHAR, updatedAtTimestamp TIMESTAMP); DROP TABLE IF EXISTS db.tests_temp_igervy; CREATE TABLE db.tests_temp_igervy (code VARCHAR, student VARCHAR, updatedAtTimestamp TIMESTAMP);"
,
"url": "<url>",
"user": redshiftUser,
"password": redshiftPassword,
"aws_iam_role": "<iam arn>"
}
This successfully loads data into Redshift and I get below:
code class updatedATTimestamp
SGHF864S QDSD3423 2023-11-17 15:56:25.025
SGHF864S FGHS7683 2023-11-17 15:56:25.025
However, upon rerunning the same job, it introduces duplicates (even if my Redshift table has (code,class) as PK). I am aware that Redshift doesn't strictly enforce the uniqueness for PK. To handle this, I tried row_number() in postactions part:
"""BEGIN; MERGE INTO db.tests USING db.tests_temp_igervy ON tests.code = tests_temp_igervy.code AND tests.class = tests_temp_igervy.class WHEN MATCHED THEN UPDATE SET code = tests_temp_igervy.code, class
= tests_temp_igervy.class, updatedAtTimestamp = tests_temp_igervy.updatedAtTimestamp WHEN NOT MATCHED THEN INSERT VALUES (tests_temp_igervy.code, tests_temp_igervy.class, tests_temp_igervy.updatedAtTimestamp); DROP TABLE db.tests_temp_igervy;
WITH cte AS (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY code, class ORDER BY (select NULL)) as rn
FROM db.tests
)
DELETE FROM db.tests
WHERE (code, class) IN (
SELECT code, class FROM cte WHERE rn > 1
);
DROP TABLE db.tests_temp_igervy;
END;
For this, I am receiving an error saying 'IN' is not supported. So I tried using JOIN instead of IN with same logic, but it still introduce duplicates on rerun.
Finally, I came across 'REMOVE DUPLICATES' so I implemented it as below and it still didn't remove duplicates upon rerun.
"postactions": "BEGIN; MERGE INTO db.tests USING db.tests_temp_igervy ON (tests.code = tests_temp_igervy.code AND tests.class = tests_temp_igervy.class) REMOVE DUPLICATES;"
Here's how I have my Redshift table defined, if it's relevant info:
Show Table DDL statement
"CREATE TABLE db.tests(
code character varying(256) NOT NULL ENCODE lzo distkey,
class character varying(256) NOT NULL ENCODE lzo,
updatedattimestamp timestamp without time zone ENCODE az64,
PRIMARY KEY (code, class)
)
DISTSTYLE AUTO;"
Can anyone share some pointers on how to handle this? TIA!