I am writing a glue script to take data from s3(PSQL WAL LOGS) to write that data into a hudi data lake.
Whenever I am trying to do that I am getting unable to upsert data with commit time error with a trace of
org.apache.hudi.exception.HoodieException: ts(Part -ts) field not found in record, Acceptable fields were : [my db columns]
I am not sure from where i am getting this ts into the scope.
Pasting the below glue script for reference. Note that i am trying to write the CDC logs to data lake, it will be great if some has a glue script to do that
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3
from pyspark.sql.types import DataType, StringType
glue_to_spark_types = {
"boolean": "boolean",
"tinyint": "tinyint",
"smallint": "smallint",
"int": "int",
"bigint": "bigint",
"float": "float",
"double": "double",
"decimal": "decimal",
"string": "string",
"date": "date",
"timestamp": "timestamp",
"array": "array",
"struct": "struct",
"map": "map"
}
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
glue_client = boto3.client('glue')
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
tables = glue_client.get_tables(DatabaseName="glue catalog db")
# for table in tables['TableList']:
for table in [{ 'Name': 'tableName' }]:
table_name = table['Name']
print("Table", table)
# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
database="GLUE CATALOG DB",
table_name=table_name,
transformation_ctx="S3bucket_node1",
)
table_schema = S3bucket_node1.schema()
mappings = []
# Iterate over the columns in the schema and generate the mappings
for field in table_schema.fields:
print("field", field)
column_name = field.name
column_glue_type = field.dataType.typeName()
print("col_glue_type", column_glue_type, column_name)
# Get the corresponding Spark SQL data type from the dictionary
column_spark_type = glue_to_spark_types.get(column_glue_type, "string")
mappings.append((column_name, column_glue_type, column_name, column_spark_type))
# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
frame=S3bucket_node1,
mappings=mappings,
transformation_ctx="ApplyMapping_node2",
)
# Script generated for node S3 bucket
additional_options = {
"hoodie.table.name": "asd" + table_name,
"hoodie.datasource.write.table.type": "COPY_ON_WRITE",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.recordkey.field": "id",
# "hoodie.datasource.write.precombine.field": "modified_at",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.parquet.compression.codec": "gzip",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.database": "asdasdasdasd",
"hoodie.datasource.hive_sync.table": "cl__" + table_name,
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
}
print("Apply mapping", additional_options)
S3bucket_node3_df = ApplyMapping_node2.toDF()
S3bucket_node3_df.write.format("hudi").options(**additional_options).mode(
"append"
).save("s3://asdasdasdasdds/" + table_name + "/")
print("table", table_name)
job.commit()