【以下的问题经过翻译处理】 我有一个 Glue ETL 脚本,最终将通过S3 event trigger和Lambda执行。但目前我只是定期运行它。如果没有新数据,脚本运行会生成错误,因为我的转换过程有空的 DataFrames。
我用的是Bookmarks。我的摄取和导出部分都在使用转换上下文。但我的普通转换代码没有包装在任何 if 语句或类似的东西中。
我的代码大概是这样:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import to_timestamp, to_date, date_format
from awsglue.dynamicframe import DynamicFrame
import pyspark.sql.functions as f
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# 数据目录:数据库和表名
db_name = "my-database"
tbl_name = "my-table"
# 为 S3 桶生成的脚本
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
format_options={"quoteChar": '"', "withHeader": True, "separator": ","},
connection_type="s3",
format="csv",
connection_options={"paths": ["s3://my-bucket/my-folder"], "recurse": True},
transformation_ctx="S3bucket_node1",
)
df = S3bucket_node1.toDF()
df = df.drop("col6")
df = df.withColumn("time", f.unix_timestamp("time", 'dd-MM-yyyy HH:mm:ss') * 1000)
df = df.withColumn("timegmt", f.unix_timestamp("timegmt", 'dd-MM-yyyy HH:mm:ss') * 1000)
df = df.withColumn("value", df.VALUE.cast('int'))
df = df.withColumn("filename", f.split(f.input_file_name