Perform UPSERT in S3 with awswrangler library
Hi,
I am trying to perform an upsert of an inceberg table. The script below creates a table with raw data stored in parquet format in an S3 bucket. Then it creates an empty iceberg table to be populated and eventually updated. When trying to insert data, it fails, please see error further down.
The script:
import pandas as pd
import awswrangler as wr
import boto3
database = "test"
iceberg_database = "iceberg_mid_sized_demo"
bucket_name = "test-bucket"
folder_name = "iceberg_mid_sized/raw_input"
path = f"s3://{bucket_name}/{folder_name}"
session = boto3.Session()
glue_client = session.client('glue')
try:
glue_client.create_database(DatabaseInput={'Name': database})
print('Database created')
except glue_client.exceptions.AlreadyExistsException as e:
print("The database already exists")
# Create external table in input parquet files.
create_raw_data_table_query = """
CREATE EXTERNAL TABLE test.raw_input(
op string,
id bigint,
name string,
city string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET
LOCATION 's3://test-bucket/iceberg_mid_sized/raw_input/'
tblproperties ("parquet.compress"="SNAPPY");
"""
create_raw_data_table_query_exec_id = wr.athena.start_query_execution(sql=create_raw_data_table_query, database=database)
# create iceberg tables database
try:
glue_client.create_database(DatabaseInput={'Name': iceberg_database})
print('Database created')
except glue_client.exceptions.AlreadyExistsException as e:
print("The database already exists")
# Create output Iceberg table with partitioning. Replace the S3 bucket name with your bucket name
create_output_iceberg_query = """
CREATE TABLE iceberg_mid_sized_demo.iceberg_output (
id bigint,
name string,
city string
)
LOCATION 's3://test-bucket/iceberg-mid_sized/iceberg_output/'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format'='parquet'
)
"""
create_iceberg_table_query_exec_id = wr.athena.start_query_execution(sql=create_output_iceberg_query, database=iceberg_database)
primary_key = ['id']
wr.s3.merge_upsert_table(delta_df=val_df, database='iceberg_mid_sized_demo', table='iceberg_output', primary_key=primary_key)
This last line returns the following traceback and error:
ArrowInvalid Traceback (most recent call last) /var/folders/y8/11mxbknn1sxbbq7vvhd14frr0000gn/T/ipykernel_17075/2358353780.py in <module> 1 primary_key = ['id'] ----> 2 wr.s3.merge_upsert_table(delta_df=val_df, database='iceberg_mid_sized_demo', table='iceberg_output', primary_key=primary_key) /opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_merge_upsert_table.py in merge_upsert_table(delta_df, database, table, primary_key, boto3_session) 111 if wr.catalog.does_table_exist(database=database, table=table, boto3_session=boto3_session): 112 # Read the existing table into a pandas dataframe --> 113 existing_df = wr.s3.read_parquet_table(database=database, table=table, boto3_session=boto3_session) 114 # Check if data quality inside dataframes to be merged are sufficient 115 if _is_data_quality_sufficient(existing_df=existing_df, delta_df=delta_df, primary_key=primary_key): /opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/_config.py in wrapper(*args_raw, **kwargs) 448 del args[name] 449 args = {**args, **keywords} --> 450 return function(**args) 451 452 wrapper.__doc__ = _inject_config_doc(doc=function.__doc__, available_configs=available_configs) /opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in read_parquet_table(table, database, filename_suffix, filename_ignore_suffix, catalog_id, partition_filter, columns, validate_schema, categories, safe, map_types, chunked, use_threads, boto3_session, s3_additional_kwargs) 969 use_threads=use_threads, 970 boto3_session=boto3_session, --> 971 s3_additional_kwargs=s3_additional_kwargs, 972 ) 973 partial_cast_function = functools.partial( /opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in read_parquet(path, path_root, path_suffix, path_ignore_suffix, version_id, ignore_empty, ignore_index, partition_filter, columns, validate_schema, chunked, dataset, categories, safe, map_types, use_threads, last_modified_begin, last_modified_end, boto3_session, s3_additional_kwargs, pyarrow_additional_kwargs) 767 if len(paths) == 1: 768 return _read_parquet( --> 769 path=paths[0], version_id=versions[paths[0]] if isinstance(versions, dict) else None, **args 770 ) 771 if validate_schema is True: /opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in _read_parquet(path, version_id, columns, categories, safe, map_types, boto3_session, dataset, path_root, s3_additional_kwargs, use_threads, pyarrow_additional_kwargs) 538 use_threads=use_threads, 539 version_id=version_id, --> 540 pyarrow_additional_kwargs=pyarrow_args, 541 ), 542 categories=categories, /opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in _read_parquet_file(path, columns, categories, boto3_session, s3_additional_kwargs, use_threads, version_id, pyarrow_additional_kwargs) 480 source=f, 481 read_dictionary=categories, --> 482 coerce_int96_timestamp_unit=pyarrow_args["coerce_int96_timestamp_unit"], 483 ) 484 if pq_file is None: /opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in _pyarrow_parquet_file_wrapper(source, read_dictionary, coerce_int96_timestamp_unit) 41 try: 42 return pyarrow.parquet.ParquetFile( ---> 43 source=source, read_dictionary=read_dictionary, coerce_int96_timestamp_unit=coerce_int96_timestamp_unit 44 ) 45 except TypeError as ex: /opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/pyarrow/parquet.py in __init__(self, source, metadata, common_metadata, read_dictionary, memory_map, buffer_size, pre_buffer, coerce_int96_timestamp_unit) 232 buffer_size=buffer_size, pre_buffer=pre_buffer, 233 read_dictionary=read_dictionary, metadata=metadata, --> 234 coerce_int96_timestamp_unit=coerce_int96_timestamp_unit 235 ) 236 self.common_metadata = common_metadata /opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/pyarrow/_parquet.pyx in pyarrow._parquet.ParquetReader.open() /opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status() ArrowInvalid: Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.
I have also tried to run the script replacing the following line:
wr.s3.merge_upsert_table(delta_df=val_df, database='iceberg_mid_sized_demo', table='iceberg_output', primary_key=primary_key)
with these
merge_into_query = """ MERGE INTO iceberg_mid_sized_demo.iceberg_output t USING test.raw_input s ON t.id = s.id WHEN MATCHED AND s.op = 'D' THEN DELETE WHEN MATCHED THEN UPDATE SET t.city = s.city WHEN NOT MATCHED THEN INSERT (id, name, city) VALUES (s.id, s.name, s.city) ; """ merge_into_query_id = wr.athena.start_query_execution(sql=merge_into_query, database="iceberg_mid_sized_demo", workgroup='wgname' )
however, now I am getting:
--------------------------------------------------------------------------- InvalidRequestException Traceback (most recent call last) /var/folders/y8/11mxbknn1sxbbq7vvhd14frr0000gn/T/ipykernel_17075/2112489404.py in <module> 1 merge_into_query_id = wr.athena.start_query_execution(sql=merge_into_query, 2 database="iceberg_mid_sized_demo", ----> 3 workgroup='athena3' 4 ) /opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/_config.py in wrapper(*args_raw, **kwargs) 448 del args[name] 449 args = {**args, **keywords} --> 450 return function(**args) 451 452 wrapper.__doc__ = _inject_config_doc(doc=function.__doc__, available_configs=available_configs) /opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/athena/_utils.py in start_query_execution(sql, database, s3_output, workgroup, encryption, kms_key, params, boto3_session, max_cache_seconds, max_cache_query_inspections, max_remote_cache_entries, max_local_cache_entries, data_source, wait) 494 encryption=encryption, 495 kms_key=kms_key, --> 496 boto3_session=session, 497 ) 498 if wait: /opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/athena/_utils.py in _start_query_execution(sql, wg_config, database, data_source, s3_output, workgroup, encryption, kms_key, boto3_session) 101 ex_code="ThrottlingException", 102 max_num_tries=5, --> 103 **args, 104 ) 105 return cast(str, response["QueryExecutionId"]) /opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/_utils.py in try_it(f, ex, ex_code, base, max_num_tries, **kwargs) 341 for i in range(max_num_tries): 342 try: --> 343 return f(**kwargs) 344 except ex as exception: 345 if ex_code is not None and hasattr(exception, "response"): /opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/botocore/client.py in _api_call(self, *args, **kwargs) 389 "%s() only accepts keyword arguments." % py_operation_name) 390 # The "self" in this scope is referring to the BaseClient. --> 391 return self._make_api_call(operation_name, kwargs) 392 393 _api_call.__name__ = str(py_operation_name) /opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params) 717 error_code = parsed_response.get("Error", {}).get("Code") 718 error_class = self.exceptions.from_code(error_code) --> 719 raise error_class(parsed_response, operation_name) 720 else: 721 return parsed_response InvalidRequestException: An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: line 5:31: mismatched input '.'. Expecting: '='
How do you perform UPSERT of Athena tables?
Thanks
- Mais recentes
- Mais votos
- Mais comentários
The following query provided the solution (make sure athena engine version 3 is used):
merge_into_query = """ MERGE INTO "iceberg_mid_sized_demo"."iceberg_output" t USING "test"."raw_input" s ON (t.id = s.id) WHEN MATCHED AND s.op = 'D' THEN DELETE WHEN MATCHED THEN UPDATE SET city = s.city WHEN NOT MATCHED THEN INSERT (id, name, city) VALUES (s.id, s.name, s.city) ; """
merge_into_query_id = wr.athena.start_query_execution(sql=merge_into_query, database="iceberg_mid_sized_demo", workgroup='athena3' )
Conteúdo relevante
- AWS OFICIALAtualizada há 2 anos
- AWS OFICIALAtualizada há 2 meses