当没有数据时,如何不执行ETL作业脚本?

0

【以下的问题经过翻译处理】 我有一个 Glue ETL 脚本,最终将通过S3 event trigger和Lambda执行。但目前我只是定期运行它。如果没有新数据,脚本运行会生成错误,因为我的转换过程有空的 DataFrames。

我用的是Bookmarks。我的摄取和导出部分都在使用转换上下文。但我的普通转换代码没有包装在任何 if 语句或类似的东西中。

我的代码大概是这样:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import to_timestamp, to_date, date_format
from awsglue.dynamicframe import DynamicFrame
import pyspark.sql.functions as f

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# 数据目录:数据库和表名
db_name = "my-database"
tbl_name = "my-table"

# 为 S3 桶生成的脚本
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={"quoteChar": '"', "withHeader": True, "separator": ","},
    connection_type="s3",
    format="csv",
    connection_options={"paths": ["s3://my-bucket/my-folder"], "recurse": True},
    transformation_ctx="S3bucket_node1",
)

df = S3bucket_node1.toDF()
df = df.drop("col6") 

df = df.withColumn("time", f.unix_timestamp("time", 'dd-MM-yyyy HH:mm:ss') * 1000)
df = df.withColumn("timegmt", f.unix_timestamp("timegmt", 'dd-MM-yyyy HH:mm:ss') * 1000)
df = df.withColumn("value", df.VALUE.cast('int'))
df = df.withColumn("filename", f.split(f.input_file_name
profile picture
EXPERTE
gefragt vor 6 Monaten7 Aufrufe
1 Antwort
0

【以下的回答经过翻译处理】 我不喜欢下面这种方法,因为它会引起额外的开销,但我不确定是否有更好的方式:在转换回dynamic frame之前,添加一个数据框是否为空的检查。另一种常见的方式是计算dataframe中的行数,如果不为零则继续,但是使用take的方法更加高效,因为它评估的分区/行数较少。

if df.take(1):
    <do your write command>

job.commit()
profile picture
EXPERTE
beantwortet vor 6 Monaten

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen