通过Glue将CSV文件按非引号方式注入RDS

0

【以下的问题经过翻译处理】 我有一个由我的Glue Job生成的pyspark脚本,旨在从S3存储桶中的CSV文件中读取数据并将其写入我的SQL RDS表中。 在我的CSV文件中,我有多行字符串。 如果解析后字符串中引号被正常解析,则作业通过,但在我的情况下,多行字符串中的引号未被正确解析,因此作业无法将数据插入我的表中; 我尝试过:

spark.read.option("multiLine", "true").option("quoteChar", -1).option("header","true")

但它没有起作用。 我还尝试过:

 datasink5 = glueContext.write_dynamic_frame.from_options(
    frame = dynamic_frame_write,
    connection_type = "s3", 
    connection_options = {
        "path": "s3://mycsvFile"
        }, 
    format = "csv", 
    format_options={
        "quoteChar": -1, 
        "separator": ","
        }, 
    transformation_ctx = "datasink5")

但这将数据写回S3而不是我的RDS表。

这是我的Glue工作:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
import pyspark.sql.functions as f

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
## spark.read.option("multiLine", "true").option("quoteChar", -1).option("header","true").option("escape","\'")
    
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

def otherTreatment(dfa):
...
   return dfa

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db_rds", table_name = "tbl_csv_extract", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("id", "string", "id", "string"), ("created", "string", "created", "timestamp"), ("name", "string", "name", "string"), ("high", "string", "high", "decimal(22,7)")], transformation_ctx = "applymapping1")

selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["created", "name", "high", "id"], transformation_ctx = "selectfields2")

resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "db_rds_sql", table_name = "tbl_teststring", transformation_ctx = "resolvechoice3")

resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4")

data_frame = resolvechoice4.toDF()



data_frame = otherTreatment(data_frame)
dynamic_frame_write = DynamicFrame.fromDF(data_frame, glueContext, "dynamic_frame_write")
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = dynamic_frame_write, database = "db_rds_sql", table_name = "tbl_teststring", transformation_ctx = "datasink5")

## with the flowing script write output back to s3 not in my sql table
datasink5 = glueContext.write_dynamic_frame.from_options(
    frame = dynamic_frame_write,
    connection_type = "s3", 
    connection_options = {
        "path": "s3://mycsvFile"
        }, 
    format = "csv", 
    format_options={
        "quoteChar": -1, 
        "separator": ","
        }, 
    transformation_ctx = "datasink5")
    
job.commit()

请教大家:如何使用 Glue Pyspark 写入我的不带引号的多行 CSV 文件?

profile picture
专家
已提问 9 个月前119 查看次数
1 回答
0

【以下的回答经过翻译处理】 我认为您可能的解决方案将是在字符串中添加引号。

可以这样理解:当您提到“在我的情况下,多行字符串没有引号”时,实际上是在说“我的 CSV 文件无效”。因为在 CSV 文件中不引用多行字符串实际上是无效的。解析器原则上无法知道该如何处理。

您可能需要执行以下操作之一:

从原始源重新生成您的 CSV,以获得有效的 CSV。 如果您对数据了解足够多,可以确定引号应该放在哪里,然后进行字符串解析以添加引号。

profile picture
专家
已回答 9 个月前

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则