insert dataframe to an ICE BERG table in pyspark script

0

I wrote the following code in a pyspark shell of Glue to create a table as an ICE BERG and insert a pandas DataFrame into it. The code executes successfully but the output is empty (there is no data in the ice berg table when run query in Athena ). There is no issue and error when I run this parquet_dyf.errorsCount(). Can anyone help me in this regard?

sql_query = f""" CREATE TABLE {table_name} ( {col_name_types} ) PARTITIONED BY (partition_0) LOCATION '{s3_location_dh}{table_name}' TBLPROPERTIES ('table_type' = 'ICEBERG', 'format'='parquet','write_target_data_file_size_bytes' = '536870912' )""" get_query_results(executeQuery(sql_query,dhSouce,outputConfig))

    # insert the dataframe into the Iceberg table
    if not df.empty:

        s3_path = f"{s3_location_dh}{table_name}/data/{partition_date}.parquet"
        wr.s3.to_parquet(df=df, path=s3_path)
        parquet_dyf = glueContext.create_dynamic_frame_from_options('s3',{'path':[s3_path]}, format='parquet')
        glueContext.write_dynamic_frame.from_catalog(frame=parquet_dyf,database=database_name,table_name=table_name)
  • How about parquet_dyf.count(), does it give you a non zero number? I guess the write command completed without error?

asked a year ago294 views
1 Answer
0

Hi,

It seems like iceberg table supports only few frameworks which includes spark, hive , flink etc.. click here for more details.

I tried different ways to write the pandas data frame to iceberg table, but it creates a different parquet file which isn't a part of iceberg table, so when I query it in Athena it won't show me the results for that iceberg table.

However, with spark data frame it works fine, I am able to add the partitions into iceberg table. I would suggest you to convert your pandas data frame to spark data frame before writing it to iceberg tables.

sparkDF=spark.createDataFrame(pandasDF) 
sparkDF.show()

To append a dataframe to an Iceberg table, use append:

sparkDF.writeTo("prod.db.table").append() click here for more details.


In case if there is a dependency on using only python programming language for data manipulation, then you can think of using PyIceberg:

PyIceberg is a Python implementation for accessing Iceberg tables, without the need of a JVM. You can easily perform your queries in python here..


When you are running a glue job in glue console, then you can follow the below glue script to write to iceberg table, using glue catalog.

-You can read it directly from s3 and then can load the data frame to the cataloged iceberg table pointing to an s3 path (I created the table in Athena, then used glue job to insert data).

Athena DDL:

CREATE TABLE iceberg_db.iceberg_table (
  col1 string, 
  col2 string, 
  col3 string, 
  col4 int, 
PARTITIONED BY (`col3`)
LOCATION 's3://iceberg-bucket/target/'
TBLPROPERTIES (
  'table_type'='iceberg',
  'format'='parquet',
  'write_target_data_file_size_bytes'='536870912'
);

Make sure to add the necessary conf parameters in the glue job, here in AWS doc..

glue script:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SparkSession, Row, DataFrame
import pandas as pd
from awsglue.dynamicframe import DynamicFrameCollection
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

#reading into pandas dataframe    
pandasDF=pd.read_csv("s3://bucket-namea/folder/file.csv")
print(pandasDF)

#converting to spark dataframe
sparkDF=spark.createDataFrame(pandasDF) 

sparkDF.show()


#write dataframe to iceberg table
glueContext.write_data_frame.from_catalog(
    frame=dataframe,
    database="iceberg_db",
    table_name="iceberg_table"
    )

job.commit()

-Now if you want to manually create a pandas dataframe and then load it to iceberg table in s3, using catalog.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SparkSession, Row, DataFrame
import pandas as pd
from awsglue.dynamicframe import DynamicFrameCollection
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

students = {
    'col1':['value1','value2'],
    'col2' :['value1', 'value2'],
    'col3':['value1','value2'],
    'col4':['value1','value2']
}
pandasDF = pd.DataFrame(students)
print(pandasDF)

#Create Spark DataFrame from Pandas
sparkDF=spark.createDataFrame(pandasDF) 

sparkDF.show()

#write dataframe to iceberg table
glueContext.write_data_frame.from_catalog(
    frame=sparkDF,
    database="iceberg_db",
    table_name="iceberg_table"
    )

job.commit()

profile picture
answered a year ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions