HIVE_BAD_DATA converting BZ2 to Parquet

0

Hello,

There are similar questions on the forum and most recommendations are to override the data type but I wanted to give context on what I'm trying to achieve in case there's a better solution.

I have a number of large CSV files which I've compressed with BZ2 (splittable compression) to then convert to Parquet and query in Athena. I've done this in glue using S3 data source and then S3 output to Parquet with snappy compression (again splittable) which is working fine in terms of actually completing conversion and surfacing in Athena. However, there are issues with data in terms of the quality of data being a mismatch of types, for isntance, ids are detected by the crawler on the BZ2 CSVs as sting for some ids and longs for others, and then once converted to Parquet, Athena complains about binary data in bigint fields.

HIVE_BAD_DATA: Field diagnosis1's type BINARY in parquet file s3://bucket_name/parquet/run-S3bucket_node3-1-part-block-0-r-00011-snappy.parquet is incompatible with type bigint defined in table schema

I don't want to convert to binary as the field is primarily a bigint field (snomed code). Would appreciate any guidance in terms of achieving what I want to achieve, and possibly even adding better sampling to the original crawler so fields are detected correctly.

Glue script for conversion follows but I have used visual editor to create the jobs.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
    database="input_database",
    table_name="input_table_name",
    transformation_ctx="S3bucket_node1",
)

# Script generated for node Change Schema (Apply Mapping)
ChangeSchemaApplyMapping_node1672958072003 = ApplyMapping.apply(
    frame=S3bucket_node1,
    mappings=[
        ("id", "string", "id", "string"),
        ("patientid", "string", "patientid", "string"),
        ("providerid", "string", "providerid", "string"),
        ("primarypatientinsuranceid", "string", "primarypatientinsuranceid", "string"),
        (
            "secondarypatientinsuranceid",
            "string",
            "secondarypatientinsuranceid",
            "string",
        ),
        ("departmentid", "long", "departmentid", "long"),
        ("patientdepartmentid", "long", "patientdepartmentid", "long"),
        ("diagnosis1", "long", "diagnosis1", "long"),
        ("diagnosis2", "long", "diagnosis2", "long"),
        ("diagnosis3", "long", "diagnosis3", "long"),
        ("diagnosis4", "long", "diagnosis4", "long"),
        ("diagnosis5", "long", "diagnosis5", "long"),
        ("diagnosis6", "string", "diagnosis6", "string"),
        ("diagnosis7", "string", "diagnosis7", "string"),
        ("diagnosis8", "string", "diagnosis8", "string"),
        ("referringproviderid", "string", "referringproviderid", "string"),
        ("appointmentid", "string", "appointmentid", "string"),
        ("currentillnessdate", "string", "currentillnessdate", "string"),
        ("servicedate", "string", "servicedate", "string"),
        ("supervisingproviderid", "string", "supervisingproviderid", "string"),
        ("status1", "string", "status1", "string"),
        ("status2", "string", "status2", "string"),
        ("statusp", "string", "statusp", "string"),
        ("outstanding1", "long", "outstanding1", "long"),
        ("outstanding2", "long", "outstanding2", "long"),
        ("outstandingp", "long", "outstandingp", "long"),
        ("lastbilleddate1", "string", "lastbilleddate1", "string"),
        ("lastbilleddate2", "string", "lastbilleddate2", "string"),
        ("lastbilleddatep", "string", "lastbilleddatep", "string"),
        ("healthcareclaimtypeid1", "long", "healthcareclaimtypeid1", "long"),
        ("healthcareclaimtypeid2", "long", "healthcareclaimtypeid2", "long"),
    ],
    transformation_ctx="ChangeSchemaApplyMapping_node1672958072003",
)

# Script generated for node AWS Glue Data Catalog
AWSGlueDataCatalog_node1672991131491 = glueContext.write_dynamic_frame.from_catalog(
    frame=ChangeSchemaApplyMapping_node1672958072003,
    database="output_database",
    table_name="output_table_name",
    additional_options={
        "enableUpdateCatalog": True,
        "updateBehavior": "UPDATE_IN_DATABASE",
    },
    transformation_ctx="AWSGlueDataCatalog_node1672991131491",
)

job.commit()
asked a year ago233 views
1 Answer
0

To solve this problem for the time being, I have used the apply mapping node to override all datatypes to strings. I can then cast them later if necessary, which is fine for exploratory analysis and most datatypes, espcially on CSV. However, open to other solutions for the problem. I believe for this solution to work you have to have a unique output directory for each csv to parquet translation in s3.

Revised code example below.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
    database="input_database",
    table_name="input_table",
    transformation_ctx="S3bucket_node1",
)

# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
    frame=S3bucket_node1,
    mappings=[
        ("start", "string", "start", "string"),
        ("stop", "string", "stop", "string"),
        ("patient", "string", "patient", "string"),
        ("encounter", "string", "encounter", "string"),
        ("code", "long", "code", "string"),
        ("system", "string", "system", "string"),
        ("description", "string", "description", "string"),
        ("type", "string", "type", "string"),
        ("category", "string", "category", "string"),
        ("reaction1", "long", "reaction1", "string"),
        ("description1", "string", "description1", "string"),
        ("severity1", "string", "severity1", "string"),
        ("reaction2", "long", "reaction2", "string"),
        ("description2", "string", "description2", "string"),
        ("severity2", "string", "severity2", "string"),
    ],
    transformation_ctx="ApplyMapping_node2",
)

# Script generated for node S3 bucket
S3bucket_node3 = glueContext.getSink(
    path="s3://my_bucket/parquet/output_csv_parquet/",
    connection_type="s3",
    updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=[],
    compression="lzo",
    enableUpdateCatalog=True,
    transformation_ctx="S3bucket_node3",
)
S3bucket_node3.setCatalogInfo(
    catalogDatabase="output_database", catalogTableName="output_csv_parquet"
)
S3bucket_node3.setFormat("glueparquet")
S3bucket_node3.writeFrame(ApplyMapping_node2)
job.commit()
answered a year ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions