Querying Delta tables from SQL Explorer in EMR Workspace
This article offers instructions on how to set up and access Delta tables from SQL Explorer in EMR JupyterHub. SQL Explorer utilizes the Presto engine configured within the EMR cluster to process data. To access Delta tables, this setup requires integrating Presto with Delta Lake.
Essentially, this article involves creating an EMR cluster configured with Presto, and then creating a Delta table using EMR Spark. Next, you'll attach the cluster to an EMR workspace or notebook and integrate Delta with Presto by making some configuration changes. These changes are essential to access the Delta table from the SQL Explorer.
Steps to configure EMR cluster
- Spin up an EMR cluster with the following applications installed: Presto, Hive, and JupyterEnterpriseGateway.
- Configure an external Hive metastore using Amazon RDS, following the example provided.
- Enable the delta table configuration on delta-defaults.
- Please ensure that your EMR cluster's version is greater than 6.4.0 and no in-transit encryption enabled as EMR Studio SQL Explorer does not support Presto clusters that have been configured with in-transit encryption enabled that affects the Presto already running in TLS mode.
aws emr create-cluster \
--name "presto-sql-explorer-external-HMS" \
--log-uri "s3://<YOUR-S3-BUCKET>/elasticmapreduce" \
--release-label "emr-7.1.0" \
--service-role "arn:aws:iam::xxxxxxxxxxxxx:role/EMR_DefaultRole" \
--unhealthy-node-replacement \
--ec2-attributes '{"InstanceProfile":"EMR_EC2_DefaultRole","EmrManagedMasterSecurityGroup":"sg-xxxxxxxxx","EmrManagedSlaveSecurityGroup":"sg-xxxxxxxxxx","KeyName":"testemr","AdditionalMasterSecurityGroups":[],"AdditionalSlaveSecurityGroups":[],"SubnetId":"subnet-xxxxxxxxxxxxxxx"}' \
--applications Name=Hive Name=JupyterEnterpriseGateway Name=Livy Name=Presto Name=Spark \
--configurations '[{"Classification":"hive-site","Properties":{"javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver","javax.jdo.option.ConnectionPassword":"********","javax.jdo.option.ConnectionURL":"jdbc:mysql://database-1.xxxxxxxxxxxxxxxxxx.rds.amazonaws.com:3306/hive?createDatabaseIfNotExist=true","javax.jdo.option.ConnectionUserName":"admin"}},{"Classification":"delta-defaults","Properties":{"delta.enabled":"true"}}]' \
--instance-groups '[{"InstanceCount":1,"InstanceGroupType":"MASTER","Name":"Primary","InstanceType":"m5.xlarge","EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"VolumeType":"gp2","SizeInGB":32},"VolumesPerInstance":2}]},"Configurations":[{"Classification":"delta-defaults","Properties":{"delta.enabled":"true"}}]},{"InstanceCount":1,"InstanceGroupType":"CORE","Name":"Core","InstanceType":"m5.xlarge","EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"VolumeType":"gp2","SizeInGB":32},"VolumesPerInstance":2}]},"Configurations":[{"Classification":"delta-defaults","Properties":{"delta.enabled":"true"}}]}]' \
--scale-down-behavior "TERMINATE_AT_TASK_COMPLETION" \
--auto-termination-policy '{"IdleTimeout":3600}' \
--region "us-east-1"
Configure Delta Table using Spark
- Once the above EMR cluster is available, connect to the primary node and open the
pyspark
shell using below Delta configurations,
pyspark --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
- Create the database with S3 location. The given s3 path should be accessible to the EMR service role where the pyspark application opened.
spark.sql("""CREATE DATABASE delta_db LOCATION 's3://<YOUR-S3-BUCKET>/delta_db/'""")
spark.sql("show databases;").show()
+---------+
|namespace|
+---------+
| default|
| delta_db|
+---------+
- Create the delta table as per below sample, Make sure to include
symLinkFormat
which will generate the manifest file that includes the parquet files created as part of this table.
spark.sql("""CREATE TABLE IF NOT EXISTS delta_db.delta_table (
id bigint,
creation_date date,
last_update_time varchar(100))
USING delta location
's3://<YOUR-S3-BUCKET>/delta_db/delta_table'
TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true)""");
- Insert some records as shown below, and verify they are committed,
spark.sql("""INSERT INTO delta_db.delta_table values ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z")""");
spark.sql("""INSERT INTO delta_db.delta_table values ("101", "2016-01-01", "2016-01-01T13:51:39.340396Z")""");
spark.sql("select * from delta_db.delta_table").show()
+---+-------------+--------------------+
| id|creation_date| last_update_time|
+---+-------------+--------------------+
|100| 2015-01-01|2015-01-01T13:51:...|
|101| 2016-01-01|2016-01-01T13:51:...|
+---+-------------+--------------------+
Collaborate with EMR Workspace
- Launch EMR Studio and start the workspace. Configure the workspace if not done already.
- Attach the EMR cluster as shown below, Please do not select the Runtime role that will not work when SQL explorer is launched.
If the runtime role attached, It will throw exception like SQL Explorer is not enabled when attached to a runtime role enabled EMR on EC2 cluster.
- Connect with SQL Explorer and browse the table that just created. This will throw an
Internal Server Error
as shown below.
- You can also connect to the Primary instance of EMR cluster and refer to
/var/log/presto/server.log
presto server log file which provides the actual exception as mentioned below,
2024-07-09T11:35:35.201Z ERROR SplitRunner-4-121 com.facebook.presto.execution.executor.TaskExecutor Error processing Split 20240709_113534_00004_au39i.2.0.0.0-0 InformationSchemaSplit{tableHandle=hive:information_schema:columns, prefixes=[hive.delta_db.delta_table], addresses=[172.31.2.142:8889]} (start = 3836825.505599, wall = 12 ms, cpu = 0 ms, wait = 0 ms, calls = 1): HIVE_UNSUPPORTED_FORMAT: Not a Hive table 'delta_db.delta_table'
2024-07-09T11:35:35.211Z ERROR remote-task-callback-13 com.facebook.presto.execution.StageExecutionStateMachine Stage execution 20240709_113534_00004_au39i.2.0 failed
com.facebook.presto.spi.PrestoException: Not a Hive table 'delta_db.delta_table'
at com.facebook.presto.hive.HiveMetadata.getTableMetadata(HiveMetadata.java:641)
at com.facebook.presto.hive.HiveMetadata.getTableMetadata(HiveMetadata.java:631)
at com.facebook.presto.hive.HiveMetadata.listTableColumns(HiveMetadata.java:822)
- To further test this exception(optional), connect to
presto-cli
and run the query which will also give the same error. Essentially it means the given table is not known by Presto. Please go ahead next section to integrate the delta table with Presto.
presto> select * from hive.delta_db.delta_table;
Query 20240709_114029_00008_au39i failed: Not a Hive table 'delta_db.delta_table'
Integrate Delta with Presto
- Since this cluster is created with external Hive metastore and Presto enabled during the cluster configuration,
hive catalog
set with below properties by default TheHive.properties
file located at/etc/presto/conf/catalog
. Here, we make sure connector.name set tohive-hadoop2
and hive.metastore.uri set tothrift://<PrimaryNodeDNS>:9083
. If the metastore configured with Glue data catalog, then hive.metastore set toglue
. However this feature does not work with querying delta table.
hive.metastore-refresh-interval=1m
connector.name=hive-hadoop2
hive.metastore-cache-ttl=20m
hive.config.resources=/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml
hive.non-managed-table-writes-enabled = true
hive.s3-file-system-type = PRESTO
hive.hdfs.impersonation.enabled = true
hive.metastore.uri = thrift://ip-172-31-2-142.ec2.internal:9083
- Change hive.s3-file-system-type to
PRESTO
as this will be pointing toEMRFS
by default. For querying Delta table, it should be a presto type. - Restart the presto service and make sure they are running,
sudo systemctl restart presto-server.service
sudo systemctl status presto-server.service
- To allow Presto to understand the table's metadata, the Delta table needs to be registered with the Hive metastore. Presto utilizes the Hive connector to read Delta tables. Therefore, create an external table with the same definition mentioned previously and use the
SymlinkTextInputFormat
as the input format. Point the S3 location to thedelta_table
created from Spark, specifically referencing the manifest file located at_symlink_format_manifest
.
CREATE EXTERNAL TABLE IF NOT EXISTS delta_db.delta_table (
id bigint, creation_date date, last_update_time varchar(100)
) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://<YOUR-S3-BUCKET>/delta_db/delta_table/_symlink_format_manifest/';
- Query the delta table using presto-cli and SQL Explorer editor that will connect hive catalog and navigate to specific table to query the data.
presto> desc hive.delta_db.delta_table;
Column | Type | Extra | Comment
------------------+--------------+-------+---------
id | bigint | |
creation_date | date | |
last_update_time | varchar(100) | |
(3 rows)
Query 20240709_121623_00019_vpaxz, FINISHED, 2 nodes
Splits: 19 total, 19 done (100.00%)
[Latency: client-side: 240ms, server-side: 157ms] [3 rows, 227B] [19 rows/s, 1.41KB/s]
presto> select * from hive.delta_db.delta_table;
id | creation_date | last_update_time
-----+---------------+-----------------------------
101 | 2016-01-01 | 2016-01-01T13:51:39.340396Z
100 | 2015-01-01 | 2015-01-01T13:51:39.340396Z
(2 rows)
Query 20240709_121625_00020_vpaxz, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
[Latency: client-side: 0:01, server-side: 0:01] [2 rows, 2.57KB] [2 rows/s, 3.11KB/s]
In this approach, we integrated Delta with Presto by utilizing the manifest file method. If you already have existing delta tables and need to generate manifests or alter tables, refer to the external document for configuration guidance.
delta connector
Alternatively, you can also access Delta tables in Presto using the Delta connector, as detailed here. In the delta connector approach, we have to create delta catalog file(locate at /etc/presto/conf/catalog/delta.properties
)with minimum below properties in all nodes and restart the presto server service. You can configure post Bootstrap action script to configure this setting instead of manually changing them.
connector.name=delta
hive.metastore.uri=thrift://<PrimaryNodeDNS>:9083
Register the table using below definition. Please note that the external table location points to delta_table directory.
CREATE EXTERNAL TABLE delta_db.delta_table (
id bigint, creation_date date, last_update_time varchar(100)
)
LOCATION 's3://<YOUR-S3-BUCKET>/delta_db/delta_table/';
You can access the delta table using presto-cli or SQL explorer. Use the format like <delta_catalog>.<database>.<table>
. This make sure to use delta catalog and refer the relevant schema where the delta_table belongs to.
presto-cli
presto> select * from delta.delta_db.delta_table;
id | creation_date | last_update_time
-----+---------------+-----------------------------
100 | 2015-01-01 | 2015-01-01T13:51:39.340396Z
101 | 2016-01-01 | 2016-01-01T13:51:39.340396Z
(2 rows)
Query 20240709_132430_00014_46bb3, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
[Latency: client-side: 0:17, server-side: 0:17] [2 rows, 2.57KB] [0 rows/s, 151B/s]
Relevant content
- Accepted Answerasked 3 months agolg...
- asked 2 years agolg...
- AWS OFFICIALUpdated 2 years ago
- AWS OFFICIALUpdated 2 years ago
- AWS OFFICIALUpdated 2 years ago
- AWS OFFICIALUpdated 2 months ago