UDF stored in S3 unable to run as a glue job

0

Trying to use a UDF for converting a column to uppercase UDF is in the S3 bucket .zip file which is accessed by the Glue job py script of UDF for reference attached ISSUE: The UDF is not storing the data in the expected S3 bucket nor it is reading the data from the S3 bucket provided

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext

# Import the UDF code from Amazon S3
spark.sparkContext.addPyFile("s3://**********")
from mylibrary import udf_uppercase

# Create a Spark context
sc = SparkContext()

# Create a Glue context
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Define the AWS Glue job
job = Job(glueContext)
job.init("uppercase_testUDF", args)

# Read data from S3
datasource0 = glueContext.create_dynamic_frame.from_options(
    "s3", 
    {"paths": ["s3://***********"]},
    format="json")

# Apply the UDF to the data
transformed_data = datasource0.toDF().withColumn("column_name", udf_uppercase(datasource0["column_name"]))

transformed_data.show()
Write the transformed data back to S3
glueContext.write_dynamic_frame.from_options(
    frame=transformed_data,
    connection_type="s3",
    connection_options={
        "path": "s3://*****************",
        "format": "json"
    },
    transformation_ctx="test_uppercase_bookmark"
)

# Run the AWS Glue job
job.commit()

2 Antworten
0
  1. Your UDF file should be at the root level of your zip file , if you are not making a py package .
  2. Instead of importing like from mylibrary import udf_uppercase , just use import udf_uppercase given that udf_uppercase.py is your UDF file name
  3. Try to use the Python Library Path text box in the job properties tab. give the full path like : s3://BucketName/utils/utils.zip

Refer : https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python-libraries.html

beantwortet vor einem Jahr
0

That should work (assuming that your file is named mylibrary.py) so that import can work.
The issue I see there is that you convert to DataFrame, add a column (still DataFrame) and then try to pass it to write_dynamic_frame which is intended for DynamicFrame.

You should receive an error like this: TypeError: frame_or_dfc must be DynamicFrame orDynamicFrameCollection. Got <class 'pyspark.sql.dataframe.DataFrame'>

BTW, I guess that's just an example, it's much more efficient if you don't use udfs and do upper case using a SQL function.

profile pictureAWS
EXPERTE
beantwortet vor einem Jahr

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen