1 Answer
- Newest
- Most votes
- Most comments
0
To solve this problem for the time being, I have used the apply mapping node to override all datatypes to strings. I can then cast them later if necessary, which is fine for exploratory analysis and most datatypes, espcially on CSV. However, open to other solutions for the problem. I believe for this solution to work you have to have a unique output directory for each csv to parquet translation in s3.
Revised code example below.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
database="input_database",
table_name="input_table",
transformation_ctx="S3bucket_node1",
)
# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
frame=S3bucket_node1,
mappings=[
("start", "string", "start", "string"),
("stop", "string", "stop", "string"),
("patient", "string", "patient", "string"),
("encounter", "string", "encounter", "string"),
("code", "long", "code", "string"),
("system", "string", "system", "string"),
("description", "string", "description", "string"),
("type", "string", "type", "string"),
("category", "string", "category", "string"),
("reaction1", "long", "reaction1", "string"),
("description1", "string", "description1", "string"),
("severity1", "string", "severity1", "string"),
("reaction2", "long", "reaction2", "string"),
("description2", "string", "description2", "string"),
("severity2", "string", "severity2", "string"),
],
transformation_ctx="ApplyMapping_node2",
)
# Script generated for node S3 bucket
S3bucket_node3 = glueContext.getSink(
path="s3://my_bucket/parquet/output_csv_parquet/",
connection_type="s3",
updateBehavior="UPDATE_IN_DATABASE",
partitionKeys=[],
compression="lzo",
enableUpdateCatalog=True,
transformation_ctx="S3bucket_node3",
)
S3bucket_node3.setCatalogInfo(
catalogDatabase="output_database", catalogTableName="output_csv_parquet"
)
S3bucket_node3.setFormat("glueparquet")
S3bucket_node3.writeFrame(ApplyMapping_node2)
job.commit()
answered a year ago
Relevant content
- Accepted Answerasked 4 years ago
- Accepted Answer
- Accepted Answer
- AWS OFFICIALUpdated a year ago
- AWS OFFICIALUpdated 2 years ago
- AWS OFFICIALUpdated 2 years ago
- AWS OFFICIALUpdated 3 years ago