Error when running a glue job to write data to data lake

0

I am writing a glue script to take data from s3(PSQL WAL LOGS) to write that data into a hudi data lake.

Whenever I am trying to do that I am getting unable to upsert data with commit time error with a trace of

 org.apache.hudi.exception.HoodieException: ts(Part -ts) field not found in record, Acceptable fields were : [my db columns]

I am not sure from where i am getting this ts into the scope.

Pasting the below glue script for reference. Note that i am trying to write the CDC logs to data lake, it will be great if some has a glue script to do that

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3
from pyspark.sql.types import DataType, StringType



glue_to_spark_types = {
    "boolean": "boolean",
    "tinyint": "tinyint",
    "smallint": "smallint",
    "int": "int",
    "bigint": "bigint",
    "float": "float",
    "double": "double",
    "decimal": "decimal",
    "string": "string",
    "date": "date",
    "timestamp": "timestamp",
    "array": "array",
    "struct": "struct",
    "map": "map"
}

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

glue_client = boto3.client('glue')

job = Job(glueContext)
job.init(args["JOB_NAME"], args)

tables = glue_client.get_tables(DatabaseName="glue catalog db")

# for table in tables['TableList']:
for table in [{ 'Name': 'tableName' }]:

    table_name = table['Name']
    print("Table", table)

# Script generated for node S3 bucket
    S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
        database="GLUE CATALOG DB",
        table_name=table_name,
        transformation_ctx="S3bucket_node1",
    )
    table_schema = S3bucket_node1.schema()
    
    mappings = []

    # Iterate over the columns in the schema and generate the mappings
    for field in table_schema.fields:
        print("field", field)
        column_name = field.name
        column_glue_type = field.dataType.typeName()
        print("col_glue_type", column_glue_type, column_name)

        # Get the corresponding Spark SQL data type from the dictionary
        column_spark_type = glue_to_spark_types.get(column_glue_type, "string")
        mappings.append((column_name, column_glue_type, column_name, column_spark_type))


    
    # Script generated for node ApplyMapping
    ApplyMapping_node2 = ApplyMapping.apply(
        frame=S3bucket_node1,
        mappings=mappings,
        transformation_ctx="ApplyMapping_node2",
    )
    
    # Script generated for node S3 bucket
    additional_options = {
        "hoodie.table.name": "asd" + table_name,
        "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
        "hoodie.datasource.write.operation": "upsert",
        "hoodie.datasource.write.recordkey.field": "id",
        # "hoodie.datasource.write.precombine.field": "modified_at",
        "hoodie.datasource.write.hive_style_partitioning": "true",
        "hoodie.parquet.compression.codec": "gzip",
        "hoodie.datasource.hive_sync.enable": "true",
        "hoodie.datasource.hive_sync.database": "asdasdasdasd",
        "hoodie.datasource.hive_sync.table": "cl__" + table_name,
        "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        "hoodie.datasource.hive_sync.use_jdbc": "false",
        "hoodie.datasource.hive_sync.mode": "hms",
    }
    
    print("Apply mapping", additional_options)
    
    S3bucket_node3_df = ApplyMapping_node2.toDF()
    
    S3bucket_node3_df.write.format("hudi").options(**additional_options).mode(
        "append"
    ).save("s3://asdasdasdasdds/" + table_name + "/")
    
    print("table", table_name)

job.commit()


1 Answer
0

Hudi requires a column that it can use to handle duplicates inside the batch (if you know that can't happen then any column will do).
hoodie.datasource.write.precombine.field defaults to "ts" but it will fail if the column doesn't exit. More information in the Hudi docs: https://hudi.apache.org/docs/basic_configurations#hoodiedatasourcewriteprecombinefield

profile pictureAWS
EXPERT
answered a year ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions