通过Glue将CSV文件按非引号方式注入RDS

0

【以下的问题经过翻译处理】 我有一个由我的Glue Job生成的pyspark脚本,旨在从S3存储桶中的CSV文件中读取数据并将其写入我的SQL RDS表中。 在我的CSV文件中,我有多行字符串。 如果解析后字符串中引号被正常解析,则作业通过,但在我的情况下,多行字符串中的引号未被正确解析,因此作业无法将数据插入我的表中; 我尝试过:

spark.read.option("multiLine", "true").option("quoteChar", -1).option("header","true")

但它没有起作用。 我还尝试过:

 datasink5 = glueContext.write_dynamic_frame.from_options(
    frame = dynamic_frame_write,
    connection_type = "s3", 
    connection_options = {
        "path": "s3://mycsvFile"
        }, 
    format = "csv", 
    format_options={
        "quoteChar": -1, 
        "separator": ","
        }, 
    transformation_ctx = "datasink5")

但这将数据写回S3而不是我的RDS表。

这是我的Glue工作:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
import pyspark.sql.functions as f

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
## spark.read.option("multiLine", "true").option("quoteChar", -1).option("header","true").option("escape","\'")
    
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

def otherTreatment(dfa):
...
   return dfa

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db_rds", table_name = "tbl_csv_extract", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("id", "string", "id", "string"), ("created", "string", "created", "timestamp"), ("name", "string", "name", "string"), ("high", "string", "high", "decimal(22,7)")], transformation_ctx = "applymapping1")

selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["created", "name", "high", "id"], transformation_ctx = "selectfields2")

resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "db_rds_sql", table_name = "tbl_teststring", transformation_ctx = "resolvechoice3")

resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4")

data_frame = resolvechoice4.toDF()



data_frame = otherTreatment(data_frame)
dynamic_frame_write = DynamicFrame.fromDF(data_frame, glueContext, "dynamic_frame_write")
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = dynamic_frame_write, database = "db_rds_sql", table_name = "tbl_teststring", transformation_ctx = "datasink5")

## with the flowing script write output back to s3 not in my sql table
datasink5 = glueContext.write_dynamic_frame.from_options(
    frame = dynamic_frame_write,
    connection_type = "s3", 
    connection_options = {
        "path": "s3://mycsvFile"
        }, 
    format = "csv", 
    format_options={
        "quoteChar": -1, 
        "separator": ","
        }, 
    transformation_ctx = "datasink5")
    
job.commit()

请教大家:如何使用 Glue Pyspark 写入我的不带引号的多行 CSV 文件?

1 Antwort
0

【以下的回答经过翻译处理】 我认为您可能的解决方案将是在字符串中添加引号。

可以这样理解:当您提到“在我的情况下,多行字符串没有引号”时,实际上是在说“我的 CSV 文件无效”。因为在 CSV 文件中不引用多行字符串实际上是无效的。解析器原则上无法知道该如何处理。

您可能需要执行以下操作之一:

从原始源重新生成您的 CSV,以获得有效的 CSV。 如果您对数据了解足够多,可以确定引号应该放在哪里,然后进行字符串解析以添加引号。

profile picture
EXPERTE
beantwortet vor 6 Monaten

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen