如何管理和优化 Iceberg 表以实现高效的数据存储和查询?
我想优化 Apache Iceberg 表,以便实现高效的数据存储和查询。
解决方法
为您的用例选择 Iceberg 表的类型
您选择的解决方法取决于您设置的 Iceberg 表的类型。
使用 CoW(Copy-On-Write,写时复制)表进行读取
在这种类型的 Iceberg 表中,当您对记录进行任何更新或删除操作后,都会在后端重写相应的数据文件。重写会降低性能,尤其是在进行多次更新和删除时。如果您的用例涉及更多读取而不是写入,请使用 CoW 表。
要将现有 Iceberg 表转换为 CoW 表,请运行以下 Apache Spark SQL 命令:
spark.sql("ALTER TABLE <table-name> SET TBLPROPERTIES ('write.delete.mode'='copy-on-write','write.update.mode'='copy-on-write')")
要创建新的 CoW 表,请使用表属性 'write.delete.mode'='copy-on-write','write.update.mode'='copy-on-write':
dataFrame.createOrReplaceTempView("tmp_<your_table_name>") query = f""" CREATE TABLE glue_catalog.<your_database_name>.<your_table_name> USING iceberg TBLPROPERTIES ('format-version'='2','write.delete.mode'='copy-on-write','write.update.mode'='copy-on-write') AS SELECT * FROM tmp_<your_table_name> """ spark.sql(query)
使用 MoR(Merge-On-Read,读时合并)表进行写入
当您更新或删除 MoR 表中的记录时,这些操作会添加新的数据文件。新添加的删除数据文件将在读取期间合并。但是,写入操作只需要向现有文件添加新文件。如果您写入表的频率高于从表中读取的频率,请选择 MoR 表。
要使用 MoR Iceberg 表,请运行以下 Spark SQL 命令:
spark.sql("ALTER TABLE <table-name> SET TBLPROPERTIES ('write.delete.mode'='merge-on-read','write.update.mode'='merge-on-read')")
**注意:**最佳做法:为了更好地控制属性,请使用 Spark 引擎创建 Iceberg 表。您也可以使用 AWS Glue、EMR Spark 或 Amazon Athena 来创建表。但是,Athena 对表属性的支持有限,仅使用 MoR 类型的表。
优化 Iceberg 表
当元数据文件、删除文件等的数量增加时,Iceberg 表的查询性能可能会下降。您可以使用以下几种方法来优化查询效率和数据存储。
使快照过期
Iceberg 表会保留快照,这样您就可以从表的旧状态中提取数据。每当对表进行写入操作时,都会生成相应的快照,并将相关的快照 ID 添加到新的元数据文件中。随着时间的推移,快照的数量增加,元数据文件的大小也会随之增大。这些不断增多的快照会导致查询性能延迟。
要淘汰快照,请使用以下选项:
-
在 Spark 中使用 expireSnapshots 操作,可并行淘汰大型表中早于指定时间戳的快照:
SparkActions.get() .expireSnapshots(table) .expireOlderThan(tsToExpire) .execute() -
或者,使用名为 expire_snapshots 的程序。有关更多信息,请参阅 Iceberg 网站上的 expire_snapshots。
spark.sql("CALL glue_catalog.system.expire_snapshots('databasename.tablename',<timestamp value>)")在 AWS Glue 任务中定期运行上述代码。当您自动淘汰快照时,您可以限制数据文件的数量,保持元数据文件较小并保持高效的查询性能。
移除旧的元数据文件
将 write.metadata.delete-after-commit.enabled 表属性设置为 True,以便在每次提交表后自动删除旧的元数据文件。您还可以设置 write.metadata.previous-versions-max 来管理要保留的旧元数据文件的数量。
重写清单文件
Iceberg 表使用清单和清单文件来跟踪所有数据文件。随着时间的推移,每个快照都引用许多清单文件。这些操作会减慢查询速度。有关更多信息,请参阅 Iceberg 网站上的清单列表和清单文件。
使用重写清单程序来高效地管理清单文件。有关更多信息,请参阅 Iceberg 网站上的 rewrite_manifests。
运行以下 Spark SQL 查询:
spark.sql("CALL glue_catalog.system.rewrite_manifests('databasename.tablename')")
重写数据文件
Iceberg 会在元数据文件中维护和跟踪表的所有数据文件。随着时间的推移,大量累积的数据文件会增加元数据文件的大小。元数据文件中不必要的或打开的文件会降低读取效率。Spark 中的 rewrite_data_files 程序有助于并行压缩数据并提高读取效率。有关更多信息,请参阅 Iceberg 网站上的 rewrite_data_files。
运行以下 Spark SQL 命令:
spark.sql("CALL glue_catalog.system.rewrite_data_files(table=>'databasename.tablename')")
使用 BINPACK 或 SORT 策略为您的用例重写数据文件。有关更多信息,请参阅 Iceberg 网站上的 BINPACK 和 SORT。
BINPACK: 这是最经济,也是最快的方法。它将较小的文件合并成较大的文件,并减少输出文件的总数。记录的顺序不会受到干扰,也不会出现数据打乱的情况。这是默认选项。
CALL catalog.system.rewrite_data_files( table => 'test_table', strategy => 'binpack', options => map( 'rewrite-job-order','bytes-asc', 'target-file-size-bytes','<set-afile-size>', 'max-file-group-size-bytes','<max-group-size>' -- 10GB ) )
SORT: SORT 策略会在压缩文件的同时对数据进行排序。当您运行许多比较相邻记录的聚合函数(例如,最小值或最大值函数)时,此策略很有用。
CALL catalog\_name.system.rewrite\_data\_files( table => 'databasename.tablename', strategy => 'sort', sort\_order => 'id', --can be any column options => map('rewrite-all','true') )
移除孤立文件
任何元数据文件中都不会引用孤立文件。有关更多信息,请参阅 Iceberg 网站上的 remove_orphan_files。
要移除孤立文件,请运行 remove_orphan_files 命令,如下所示:
spark.sql("CALL glue_catalog.system.remove_orphan_files(table=>'databasename.tablename')")
**注意:**最佳做法是运行有计划的任务来管理维护活动。使用单个 AWS Glue 任务来运行上述所有 Spark SQL 查询。
相关信息
相关内容
AWS 官方已更新 1 年前