通过Glue将CSV文件按非引号方式注入RDS
0
【以下的问题经过翻译处理】 我有一个由我的Glue Job生成的pyspark脚本,旨在从S3存储桶中的CSV文件中读取数据并将其写入我的SQL RDS表中。 在我的CSV文件中,我有多行字符串。 如果解析后字符串中引号被正常解析,则作业通过,但在我的情况下,多行字符串中的引号未被正确解析,因此作业无法将数据插入我的表中; 我尝试过:
spark.read.option("multiLine", "true").option("quoteChar", -1).option("header","true")
但它没有起作用。 我还尝试过:
datasink5 = glueContext.write_dynamic_frame.from_options(
frame = dynamic_frame_write,
connection_type = "s3",
connection_options = {
"path": "s3://mycsvFile"
},
format = "csv",
format_options={
"quoteChar": -1,
"separator": ","
},
transformation_ctx = "datasink5")
但这将数据写回S3而不是我的RDS表。
这是我的Glue工作:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
import pyspark.sql.functions as f
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
## spark.read.option("multiLine", "true").option("quoteChar", -1).option("header","true").option("escape","\'")
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
def otherTreatment(dfa):
...
return dfa
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db_rds", table_name = "tbl_csv_extract", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("id", "string", "id", "string"), ("created", "string", "created", "timestamp"), ("name", "string", "name", "string"), ("high", "string", "high", "decimal(22,7)")], transformation_ctx = "applymapping1")
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["created", "name", "high", "id"], transformation_ctx = "selectfields2")
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "db_rds_sql", table_name = "tbl_teststring", transformation_ctx = "resolvechoice3")
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4")
data_frame = resolvechoice4.toDF()
data_frame = otherTreatment(data_frame)
dynamic_frame_write = DynamicFrame.fromDF(data_frame, glueContext, "dynamic_frame_write")
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = dynamic_frame_write, database = "db_rds_sql", table_name = "tbl_teststring", transformation_ctx = "datasink5")
## with the flowing script write output back to s3 not in my sql table
datasink5 = glueContext.write_dynamic_frame.from_options(
frame = dynamic_frame_write,
connection_type = "s3",
connection_options = {
"path": "s3://mycsvFile"
},
format = "csv",
format_options={
"quoteChar": -1,
"separator": ","
},
transformation_ctx = "datasink5")
job.commit()
请教大家:如何使用 Glue Pyspark 写入我的不带引号的多行 CSV 文件?
1 回答
- 最新
- 投票最多
- 评论最多
这些答案有用吗?为正确答案投票,以帮助社区从您的知识中受益。
0
【以下的回答经过翻译处理】 我认为您可能的解决方案将是在字符串中添加引号。
可以这样理解:当您提到“在我的情况下,多行字符串没有引号”时,实际上是在说“我的 CSV 文件无效”。因为在 CSV 文件中不引用多行字符串实际上是无效的。解析器原则上无法知道该如何处理。
您可能需要执行以下操作之一:
从原始源重新生成您的 CSV,以获得有效的 CSV。 如果您对数据了解足够多,可以确定引号应该放在哪里,然后进行字符串解析以添加引号。