我想将我的非数据湖表迁移到 Apache Iceberg 表。
简短描述
强大的数据湖架构允许您将各种数据源整合到一个位置以便于访问。之后,您可以使用数据管理工具和传统数据仓库系统的 ACID 事务来降低成本。
解决方法
Iceberg 表支持两种迁移:就地迁移和影子迁移。有关更多信息,请参阅 Iceberg 网站上的就地迁移和影子迁移。
使用 Iceberg 配置来初始化 Apache Spark 会话
在 AWS Glue 中创建新的 notebook。创建 notebook 时,请务必使用具有正确权限的 AWS Identity and Access Management (IAM) 角色。
完成以下步骤:
- 登录 AWS Glue 控制台。
- 创建 Glue notebook 并配置以下参数:
%glue_version 4.0
%idle_timeout 60
%number_of_workers 5
%worker_type G.1X
%%configure
{
"--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"--datalake-formats": "iceberg"
}
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config(f"spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config(f"spark.sql.catalog.glue_catalog.warehouse","s3://bucket_name/prefix/") \
.config(f"spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config(f"spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
.getOrCreate()
使用就地迁移来移动非数据湖表
就地迁移使源表的数据文件保持原样,然后将它们添加到新的 Iceberg 表中。该过程仅创建目标表的元数据。迁移期间发生的任何错误只需要重写元数据,而不需要重写数据文件本身。
要迁移表,请完成以下步骤:
**注意:**使用 add_files 获取 Parquet 数据。
-
创建一个与源表结构相同的空 Iceberg 表。
query = f"""
CREATE TABLE IF NOT EXISTS glue_catalog.dbname.target_iceberg_tablename USING iceberg LOCATION 's3://bucket_name/prefix/' AS
SELECT * FROM dbname.source_tablename LIMIT 0
"""
spark.sql(query)
-
运行 add_files 程序:
query = f"""
CALL glue_catalog.system.add_files(table => 'dbname.target_iceberg_tablename', source_table => 'dbname.source_tablename')
"""
spark.sql(query).show(truncate=False)
-
检查数据文件是否仍指向位于源位置的先前数据文件。只有在新位置创建的元数据文件才属于 Iceberg 表:
query = f"""
SELECT file_path FROM glue_catalog.dbname.target_iceberg_tablename.files
"""
spark.sql(query).show(10, truncate=False)
使用影子迁移来移动非数据湖表
影子迁移过程会重置源表的数据。您可以在迁移期间验证数据。您还可以通过为每种配置创建新表来测试不同的配置。
为避免任何冲突,这种类型的迁移会将目标数据文件放置在与源数据文件不同的位置。该过程还允许您使用回滚和删除目标表,以此作为从任何问题中快速恢复的一种方式。
要迁移表,请完成以下步骤:
-
使用现有表中的数据来创建 Iceberg 表:
query = f"""
CREATE TABLE IF NOT EXISTS glue_catalog.dbname.target_iceberg_tablename USING iceberg LOCATION 's3://bucket_name/prefix/' AS
SELECT * FROM dbname.source_tablename
"""
spark.sql(query).show(truncate=False)
-
检查数据文件是否已迁移到新位置。在新位置创建的元数据文件才属于 Iceberg 表:
query = f"""
SELECT file_path FROM glue_catalog.dbname.target_iceberg_tablename.files
"""
spark.sql(query).show(10, truncate=False)
相关信息
在 AWS Glue 中使用 Iceberg 框架