Perform UPSERT in S3 with awswrangler library

0

Hi,

I am trying to perform an upsert of an inceberg table. The script below creates a table with raw data stored in parquet format in an S3 bucket. Then it creates an empty iceberg table to be populated and eventually updated. When trying to insert data, it fails, please see error further down.

The script:

import pandas as pd
import awswrangler as wr
import boto3

database = "test"
iceberg_database = "iceberg_mid_sized_demo"
bucket_name = "test-bucket"
folder_name = "iceberg_mid_sized/raw_input"
path = f"s3://{bucket_name}/{folder_name}"

session = boto3.Session()

glue_client = session.client('glue')

try:
    glue_client.create_database(DatabaseInput={'Name': database})
    print('Database created')
    
except glue_client.exceptions.AlreadyExistsException as e:
    print("The database already exists")

# Create external table in input parquet files. 
create_raw_data_table_query = """
CREATE EXTERNAL TABLE test.raw_input(
  op string, 
  id bigint, 
  name string, 
  city string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
STORED AS PARQUET
LOCATION 's3://test-bucket/iceberg_mid_sized/raw_input/'
tblproperties ("parquet.compress"="SNAPPY");
"""

create_raw_data_table_query_exec_id = wr.athena.start_query_execution(sql=create_raw_data_table_query, database=database)

# create iceberg tables database
try:
    glue_client.create_database(DatabaseInput={'Name': iceberg_database})
    print('Database created')
    
except glue_client.exceptions.AlreadyExistsException as e:
    print("The database already exists")

# Create output Iceberg table with partitioning. Replace the S3 bucket name with your bucket name
create_output_iceberg_query = """
CREATE TABLE iceberg_mid_sized_demo.iceberg_output (
  id bigint,
  name string,
  city string
  ) 
LOCATION 's3://test-bucket/iceberg-mid_sized/iceberg_output/' 
TBLPROPERTIES (
  'table_type'='ICEBERG',
  'format'='parquet'
)
"""

create_iceberg_table_query_exec_id = wr.athena.start_query_execution(sql=create_output_iceberg_query, database=iceberg_database)

primary_key = ['id']
wr.s3.merge_upsert_table(delta_df=val_df, database='iceberg_mid_sized_demo', table='iceberg_output', primary_key=primary_key)

This last line returns the following traceback and error:


ArrowInvalid                              Traceback (most recent call last)
/var/folders/y8/11mxbknn1sxbbq7vvhd14frr0000gn/T/ipykernel_17075/2358353780.py in <module>
      1 primary_key = ['id']
----> 2 wr.s3.merge_upsert_table(delta_df=val_df, database='iceberg_mid_sized_demo', table='iceberg_output', primary_key=primary_key)

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_merge_upsert_table.py in merge_upsert_table(delta_df, database, table, primary_key, boto3_session)
    111     if wr.catalog.does_table_exist(database=database, table=table, boto3_session=boto3_session):
    112         # Read the existing table into a pandas dataframe
--> 113         existing_df = wr.s3.read_parquet_table(database=database, table=table, boto3_session=boto3_session)
    114         # Check if data quality inside dataframes to be merged are sufficient
    115         if _is_data_quality_sufficient(existing_df=existing_df, delta_df=delta_df, primary_key=primary_key):

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/_config.py in wrapper(*args_raw, **kwargs)
    448                 del args[name]
    449                 args = {**args, **keywords}
--> 450         return function(**args)
    451 
    452     wrapper.__doc__ = _inject_config_doc(doc=function.__doc__, available_configs=available_configs)

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in read_parquet_table(table, database, filename_suffix, filename_ignore_suffix, catalog_id, partition_filter, columns, validate_schema, categories, safe, map_types, chunked, use_threads, boto3_session, s3_additional_kwargs)
    969         use_threads=use_threads,
    970         boto3_session=boto3_session,
--> 971         s3_additional_kwargs=s3_additional_kwargs,
    972     )
    973     partial_cast_function = functools.partial(

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in read_parquet(path, path_root, path_suffix, path_ignore_suffix, version_id, ignore_empty, ignore_index, partition_filter, columns, validate_schema, chunked, dataset, categories, safe, map_types, use_threads, last_modified_begin, last_modified_end, boto3_session, s3_additional_kwargs, pyarrow_additional_kwargs)
    767     if len(paths) == 1:
    768         return _read_parquet(
--> 769             path=paths[0], version_id=versions[paths[0]] if isinstance(versions, dict) else None, **args
    770         )
    771     if validate_schema is True:

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in _read_parquet(path, version_id, columns, categories, safe, map_types, boto3_session, dataset, path_root, s3_additional_kwargs, use_threads, pyarrow_additional_kwargs)
    538             use_threads=use_threads,
    539             version_id=version_id,
--> 540             pyarrow_additional_kwargs=pyarrow_args,
    541         ),
    542         categories=categories,

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in _read_parquet_file(path, columns, categories, boto3_session, s3_additional_kwargs, use_threads, version_id, pyarrow_additional_kwargs)
    480             source=f,
    481             read_dictionary=categories,
--> 482             coerce_int96_timestamp_unit=pyarrow_args["coerce_int96_timestamp_unit"],
    483         )
    484         if pq_file is None:

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in _pyarrow_parquet_file_wrapper(source, read_dictionary, coerce_int96_timestamp_unit)
     41         try:
     42             return pyarrow.parquet.ParquetFile(
---> 43                 source=source, read_dictionary=read_dictionary, coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
     44             )
     45         except TypeError as ex:

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/pyarrow/parquet.py in __init__(self, source, metadata, common_metadata, read_dictionary, memory_map, buffer_size, pre_buffer, coerce_int96_timestamp_unit)
    232             buffer_size=buffer_size, pre_buffer=pre_buffer,
    233             read_dictionary=read_dictionary, metadata=metadata,
--> 234             coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
    235         )
    236         self.common_metadata = common_metadata

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/pyarrow/_parquet.pyx in pyarrow._parquet.ParquetReader.open()

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()

ArrowInvalid: Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.

I have also tried to run the script replacing the following line:

wr.s3.merge_upsert_table(delta_df=val_df, database='iceberg_mid_sized_demo', table='iceberg_output', primary_key=primary_key)

with these


merge_into_query = """
MERGE INTO iceberg_mid_sized_demo.iceberg_output t
USING test.raw_input s
ON t.id = s.id
WHEN MATCHED AND s.op = 'D' THEN DELETE
WHEN MATCHED THEN UPDATE SET t.city = s.city
WHEN NOT MATCHED THEN INSERT (id, name, city) VALUES (s.id, s.name, s.city)
;
"""

merge_into_query_id = wr.athena.start_query_execution(sql=merge_into_query,
                                                     database="iceberg_mid_sized_demo",
                                                    workgroup='wgname'
                                                     )

however, now I am getting:


---------------------------------------------------------------------------
InvalidRequestException                   Traceback (most recent call last)
/var/folders/y8/11mxbknn1sxbbq7vvhd14frr0000gn/T/ipykernel_17075/2112489404.py in <module>
      1 merge_into_query_id = wr.athena.start_query_execution(sql=merge_into_query,
      2                                                      database="iceberg_mid_sized_demo",
----> 3                                                     workgroup='athena3'
      4                                                      )

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/_config.py in wrapper(*args_raw, **kwargs)
    448                 del args[name]
    449                 args = {**args, **keywords}
--> 450         return function(**args)
    451 
    452     wrapper.__doc__ = _inject_config_doc(doc=function.__doc__, available_configs=available_configs)

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/athena/_utils.py in start_query_execution(sql, database, s3_output, workgroup, encryption, kms_key, params, boto3_session, max_cache_seconds, max_cache_query_inspections, max_remote_cache_entries, max_local_cache_entries, data_source, wait)
    494             encryption=encryption,
    495             kms_key=kms_key,
--> 496             boto3_session=session,
    497         )
    498     if wait:

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/athena/_utils.py in _start_query_execution(sql, wg_config, database, data_source, s3_output, workgroup, encryption, kms_key, boto3_session)
    101         ex_code="ThrottlingException",
    102         max_num_tries=5,
--> 103         **args,
    104     )
    105     return cast(str, response["QueryExecutionId"])

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/_utils.py in try_it(f, ex, ex_code, base, max_num_tries, **kwargs)
    341     for i in range(max_num_tries):
    342         try:
--> 343             return f(**kwargs)
    344         except ex as exception:
    345             if ex_code is not None and hasattr(exception, "response"):

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
    389                     "%s() only accepts keyword arguments." % py_operation_name)
    390             # The "self" in this scope is referring to the BaseClient.
--> 391             return self._make_api_call(operation_name, kwargs)
    392 
    393         _api_call.__name__ = str(py_operation_name)

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
    717             error_code = parsed_response.get("Error", {}).get("Code")
    718             error_class = self.exceptions.from_code(error_code)
--> 719             raise error_class(parsed_response, operation_name)
    720         else:
    721             return parsed_response

InvalidRequestException: An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: line 5:31: mismatched input '.'. Expecting: '='

How do you perform UPSERT of Athena tables?

Thanks

1 Answer
1
Accepted Answer

The following query provided the solution (make sure athena engine version 3 is used):

merge_into_query = """ MERGE INTO "iceberg_mid_sized_demo"."iceberg_output" t USING "test"."raw_input" s ON (t.id = s.id) WHEN MATCHED AND s.op = 'D' THEN DELETE WHEN MATCHED THEN UPDATE SET city = s.city WHEN NOT MATCHED THEN INSERT (id, name, city) VALUES (s.id, s.name, s.city) ; """

merge_into_query_id = wr.athena.start_query_execution(sql=merge_into_query, database="iceberg_mid_sized_demo", workgroup='athena3' )

answered a year ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions