Error when running a glue job to write data to data lake

0

I am writing a glue script to take data from s3(PSQL WAL LOGS) to write that data into a hudi data lake.

Whenever I am trying to do that I am getting unable to upsert data with commit time error with a trace of

 org.apache.hudi.exception.HoodieException: ts(Part -ts) field not found in record, Acceptable fields were : [my db columns]

I am not sure from where i am getting this ts into the scope.

Pasting the below glue script for reference. Note that i am trying to write the CDC logs to data lake, it will be great if some has a glue script to do that

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3
from pyspark.sql.types import DataType, StringType



glue_to_spark_types = {
    "boolean": "boolean",
    "tinyint": "tinyint",
    "smallint": "smallint",
    "int": "int",
    "bigint": "bigint",
    "float": "float",
    "double": "double",
    "decimal": "decimal",
    "string": "string",
    "date": "date",
    "timestamp": "timestamp",
    "array": "array",
    "struct": "struct",
    "map": "map"
}

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

glue_client = boto3.client('glue')

job = Job(glueContext)
job.init(args["JOB_NAME"], args)

tables = glue_client.get_tables(DatabaseName="glue catalog db")

# for table in tables['TableList']:
for table in [{ 'Name': 'tableName' }]:

    table_name = table['Name']
    print("Table", table)

# Script generated for node S3 bucket
    S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
        database="GLUE CATALOG DB",
        table_name=table_name,
        transformation_ctx="S3bucket_node1",
    )
    table_schema = S3bucket_node1.schema()
    
    mappings = []

    # Iterate over the columns in the schema and generate the mappings
    for field in table_schema.fields:
        print("field", field)
        column_name = field.name
        column_glue_type = field.dataType.typeName()
        print("col_glue_type", column_glue_type, column_name)

        # Get the corresponding Spark SQL data type from the dictionary
        column_spark_type = glue_to_spark_types.get(column_glue_type, "string")
        mappings.append((column_name, column_glue_type, column_name, column_spark_type))


    
    # Script generated for node ApplyMapping
    ApplyMapping_node2 = ApplyMapping.apply(
        frame=S3bucket_node1,
        mappings=mappings,
        transformation_ctx="ApplyMapping_node2",
    )
    
    # Script generated for node S3 bucket
    additional_options = {
        "hoodie.table.name": "asd" + table_name,
        "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
        "hoodie.datasource.write.operation": "upsert",
        "hoodie.datasource.write.recordkey.field": "id",
        # "hoodie.datasource.write.precombine.field": "modified_at",
        "hoodie.datasource.write.hive_style_partitioning": "true",
        "hoodie.parquet.compression.codec": "gzip",
        "hoodie.datasource.hive_sync.enable": "true",
        "hoodie.datasource.hive_sync.database": "asdasdasdasd",
        "hoodie.datasource.hive_sync.table": "cl__" + table_name,
        "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        "hoodie.datasource.hive_sync.use_jdbc": "false",
        "hoodie.datasource.hive_sync.mode": "hms",
    }
    
    print("Apply mapping", additional_options)
    
    S3bucket_node3_df = ApplyMapping_node2.toDF()
    
    S3bucket_node3_df.write.format("hudi").options(**additional_options).mode(
        "append"
    ).save("s3://asdasdasdasdds/" + table_name + "/")
    
    print("table", table_name)

job.commit()


1개 답변
0

Hudi requires a column that it can use to handle duplicates inside the batch (if you know that can't happen then any column will do).
hoodie.datasource.write.precombine.field defaults to "ts" but it will fail if the column doesn't exit. More information in the Hudi docs: https://hudi.apache.org/docs/basic_configurations#hoodiedatasourcewriteprecombinefield

profile pictureAWS
전문가
답변함 일 년 전

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인