AWS Glue: ETL Struct field (Dynamo DB) to Aurora Postgres json field

0

Several issues:

  1. AWS Auto-generates code that lists the struct type as an "Object" which fails: ChangeSchema_node1685651062990 = ApplyMapping.apply( frame=AWSGlueDataCatalog_node1685651050820, mappings=[ ("company_id", "string", "company_id", "string"), ("request_model", "string", "request_model", "string"), ("created", "string", "created", "timestamp"), ("response_id", "string", "response_id", "string"), ("usage_total_tokens", "long", "usage_total_tokens", "long"), ("usage_request_tokens", "long", "usage_request_tokens", "long"), ("usage_refinement_tokens", "long", "usage_refinement_tokens", "long"), ("api_key_id", "string", "api_key_id", "string"), ("usage_response_tokens", "long", "usage_response_tokens", "long"), ("user_id", "string", "user_id", "string"), ("response_model", "string", "response_model", "string"), ("response_text", "string", "response_text", "string"), ("id", "string", "id", "string"), ("request_payload", "object", "request_payload", "string"), ("request_id", "string", "request_id", "string"), ("bot_id", "string", "bot_id", "string"), ], transformation_ctx="ChangeSchema_node1685651062990", )

To clear this I have to change "object" to "string"

  1. But when attemptoing to write to the PostgresSQL database it fails because the target field "request_payload" is a json field, so I get the folllowing eeror: "An error occurred while calling o135.pyWriteDynamicFrame. ERROR: column "request_payload" is of type json but expression is of type character varying"

Is there a way to cast the field as json before it's written to PostgreSQL DB?

gefragt vor einem Jahr661 Aufrufe
1 Antwort
0

you can modify your AWS Glue script

import pyspark.sql.functions as F
dfc = ChangeSchema_node1685651062990.toDF()  # convert dynamic frame to dataframe
dfc = dfc.withColumn("request_payload", F.to_json("request_payload"))  # convert struct to json string
ChangeSchema_node1685651062990 = DynamicFrame.fromDF(dfc, glueContext, "ChangeSchema_node1685651062990")  # convert back to dynamic frame
profile picture
EXPERTE
beantwortet vor einem Jahr
  • Script:

    AWSGlueDataCatalog_node1685651050820 = glueContext.create_dynamic_frame.from_catalog( database="dynamodb-to-analyticsdb", table_name="prompthistory", transformation_ctx="AWSGlueDataCatalog_node1685651050820", )

    PromptHistory_df = AWSGlueDataCatalog_node1685651050820.toDF() PromptHistory_df = PromptHistory_df.withColumn("request_payload", F.to_json("request_payload")) PromptHistory_df = PromptHistory_df.withColumn("created", F.to_timestamp("created")) AWSGlueDataCatalog_node1685651050820 = DynamicFrame.fromDF(PromptHistory_df, glueContext, "AWSGlueDataCatalog_node1685651050820")

    AWSGlueDataCatalog_node1685651092780 = glueContext.write_dynamic_frame.from_catalog( frame=AWSGlueDataCatalog_node1685651050820, database="ablt-ai-analytics-12-14", table_name="postgres_public_prompthistory_06e0ad580a8f6d2ff0e57e074d377329", transformation_ctx="AWSGlueDataCatalog_node1685651092780", )

    job.commit()

  • This is close to what I need but there is still several issues:

    1. This will actually have to be done one level up on "AWSGlueDataCatalog_node1685651050820" rather than on "ChangeSchema_node1685651062990" because to_json will not accept a string as an input type but will accept a struct type.

    2. so I re-wrote the function but write still fails, checking the schema of the DataFrame and DynamicFrame both list the data type as string. So when I write to Postgres I still get the error: "An error occurred while calling o110.pyWriteDynamicFrame. ERROR: column "request_payload" is of type json but expression is of type character varying"

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen