Questions tagged with AWS Glue
Content language: English
Sort by most recent
I've created a data validation box in my Glue ETL, which imports the following:
`from awsgluedq.transforms import EvaluateDataQuality`
To develop further my script, I've copied the script to a AWS Glue Notebook. But the line doesn't work, it throws the error:
`ModuleNotFoundError: No module named 'awsgluedq'`
I've tried to add it through the magic `%extra_py_files ['awsgluedq']` and `%additional_python_modules ['awsgluedq']` but it doesn't work either.
How can I import that module?
We're using the `GlueSchemaRegistryDeserializerDataParser` class from https://github.com/awslabs/aws-glue-schema-registry.
This seems to be from the v1 of the AWS SDK (or am I wrong?)
Is there a replacement in aws-sdk-java-v2 (https://github.com/aws/aws-sdk-java-v2)?
We're using the `GlueSchemaRegistryDeserializerDataParser` class from https://github.com/awslabs/aws-glue-schema-registry.
This seems to be from the v1 of the AWS SDK (or am I wrong?)
Is there a replacement in aws-sdk-java-v2 (https://github.com/aws/aws-sdk-java-v2)?
Here is my code snippet:
[transaction_DyF = glueContext.create_dynamic_frame.from_catalog(
database = source_db,
table_name = source_tbl,
push_down_predicate = pushDownPredicate,
transformation_ctx = source_tbl)
Error Message:
Error: An error occurred while calling o103.getDynamicFrame.\n: java.lang.ClassNotFoundException: Failed to find data source: UNKNOWN
I have creating glue job that is reading parquet file from s3 and using iceberg connector to create iceberg table . I have used catalog name as my_catalog , database I have created with name db and table name I have given is sampletable , though when I run the job it fails with below error:
**AnalysisException: The namespace in session catalog must have exactly one name part: my_catalog.db.sampletable**
I'm writing into redshift and realized Glue 4.0 is probably optimizing the column sizes.
Summary of error:
```
py4j.protocol.Py4JJavaError: An error occurred while calling o236.pyWriteDynamicFrame.
: java.sql.SQLException:
Error (code 1204) while loading data into Redshift: "String length exceeds DDL length"
Table name: "PUBLIC"."table_name"
Column name: column_a
Column type: varchar(256)
```
In previous glue versions, the string columns were always varchar(65535) but now, my tables are created with varchar(256), and writing into some columns fail due to this error. Now, will this occur with other data types? . How can I solve this within Glue 4.0?
Can we add two triggers to same workflow of each trigger with different type. One with CONDITIONAL and Other trigger with SCHEDULED
My workflow design is
There is SCHEDULED Trigger with two jobs . On Successful Completion on these two Jobs there has to be invocation of one more trigger.
Unfortunately I could not find any documentation in various online forums. Can anyone help here please
Hello,
There are similar questions on the forum and most recommendations are to override the data type but I wanted to give context on what I'm trying to achieve in case there's a better solution.
I have a number of large CSV files which I've compressed with BZ2 (splittable compression) to then convert to Parquet and query in Athena. I've done this in glue using S3 data source and then S3 output to Parquet with snappy compression (again splittable) which is working fine in terms of actually completing conversion and surfacing in Athena. However, there are issues with data in terms of the quality of data being a mismatch of types, for isntance, ids are detected by the crawler on the BZ2 CSVs as sting for some ids and longs for others, and then once converted to Parquet, Athena complains about binary data in bigint fields.
HIVE_BAD_DATA: Field diagnosis1's type BINARY in parquet file s3://bucket_name/parquet/run-S3bucket_node3-1-part-block-0-r-00011-snappy.parquet is incompatible with type bigint defined in table schema
I don't want to convert to binary as the field is primarily a bigint field (snomed code). Would appreciate any guidance in terms of achieving what I want to achieve, and possibly even adding better sampling to the original crawler so fields are detected correctly.
Glue script for conversion follows but I have used visual editor to create the jobs.
```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
database="input_database",
table_name="input_table_name",
transformation_ctx="S3bucket_node1",
)
# Script generated for node Change Schema (Apply Mapping)
ChangeSchemaApplyMapping_node1672958072003 = ApplyMapping.apply(
frame=S3bucket_node1,
mappings=[
("id", "string", "id", "string"),
("patientid", "string", "patientid", "string"),
("providerid", "string", "providerid", "string"),
("primarypatientinsuranceid", "string", "primarypatientinsuranceid", "string"),
(
"secondarypatientinsuranceid",
"string",
"secondarypatientinsuranceid",
"string",
),
("departmentid", "long", "departmentid", "long"),
("patientdepartmentid", "long", "patientdepartmentid", "long"),
("diagnosis1", "long", "diagnosis1", "long"),
("diagnosis2", "long", "diagnosis2", "long"),
("diagnosis3", "long", "diagnosis3", "long"),
("diagnosis4", "long", "diagnosis4", "long"),
("diagnosis5", "long", "diagnosis5", "long"),
("diagnosis6", "string", "diagnosis6", "string"),
("diagnosis7", "string", "diagnosis7", "string"),
("diagnosis8", "string", "diagnosis8", "string"),
("referringproviderid", "string", "referringproviderid", "string"),
("appointmentid", "string", "appointmentid", "string"),
("currentillnessdate", "string", "currentillnessdate", "string"),
("servicedate", "string", "servicedate", "string"),
("supervisingproviderid", "string", "supervisingproviderid", "string"),
("status1", "string", "status1", "string"),
("status2", "string", "status2", "string"),
("statusp", "string", "statusp", "string"),
("outstanding1", "long", "outstanding1", "long"),
("outstanding2", "long", "outstanding2", "long"),
("outstandingp", "long", "outstandingp", "long"),
("lastbilleddate1", "string", "lastbilleddate1", "string"),
("lastbilleddate2", "string", "lastbilleddate2", "string"),
("lastbilleddatep", "string", "lastbilleddatep", "string"),
("healthcareclaimtypeid1", "long", "healthcareclaimtypeid1", "long"),
("healthcareclaimtypeid2", "long", "healthcareclaimtypeid2", "long"),
],
transformation_ctx="ChangeSchemaApplyMapping_node1672958072003",
)
# Script generated for node AWS Glue Data Catalog
AWSGlueDataCatalog_node1672991131491 = glueContext.write_dynamic_frame.from_catalog(
frame=ChangeSchemaApplyMapping_node1672958072003,
database="output_database",
table_name="output_table_name",
additional_options={
"enableUpdateCatalog": True,
"updateBehavior": "UPDATE_IN_DATABASE",
},
transformation_ctx="AWSGlueDataCatalog_node1672991131491",
)
job.commit()
```
In Python programmatic way I need to add jobs and triggers to glue workflow. I am not sure how to do it can anyone help.
I have tried with boto3 library I can create workflow , start and stop but unable to assign jobs or triggers .
From below link I have seen it is possible via event bridge but I don't have IAM Access or to create any policies or roles
https://docs.aws.amazon.com/glue/latest/dg/starting-workflow-eventbridge.html
From below link I have seen there is a code snippet not sure where and how to execute and what service need to be used
https://repost.aws/questions/QU8oIj15VUTCS7d38ocHq7nQ/how-to-create-a-glue-workflow-programmatically
I need some working step wise example with code snippet
Using AWS Glue Interactive Notebook I notice a bug when I try to change the kernel. Sometimes it can permit to me to click and change the kernel but sometimes it cannot. Also reloading the same notebook saved with an other kernel and language it appears always with the default PySpark kernel.

We use DMS to export data from MY SQL to S3 post which we run ETLs. The Glue ETLs use bookmarks hence reads only what has changed from the last time it ran . However the raw data keeps increasing in terms of numerous KiloByte files.
My plan is to
1. Write a glue job to read all these files and keep creating 256 MB files
2. Create a retention policy on the DMS end point bucket to delete files older than 90 days
The reason
1. for selecting 256 MB is , I read somewhere that 256 MB is the preferred file size by Athena . Is that right. ?
2. for compacting the raw files is to make it easier for any other application to consume the data , that is, read small number of 256 MB files than reading millions of KB files
What I want to know is
* what is the general architecture around this ?
* Is it a good practice to implement my steps ?
Attempting a basic glue example inside the latest amazon glue container fails with
```shell
An error occurred while calling o47.getDynamicFrame.
: java.io.FileNotFoundException: (No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at com.amazonaws.glue.jdbc.commons.CustomCertificateManager.importCustomJDBCCert(CustomCertificateManager.java:127)
at com.amazonaws.services.glue.util.JDBCWrapper$.connectionProperties(JDBCUtils.scala:952)
at com.amazonaws.services.glue.util.JDBCWrapper.connectionProperties$lzycompute(JDBCUtils.scala:738)
at com.amazonaws.services.glue.util.JDBCWrapper.connectionProperties(JDBCUtils.scala:738)
at com.amazonaws.services.glue.util.JDBCWrapper.tableDF(JDBCUtils.scala:864)
at com.amazonaws.services.glue.util.NoCondition$.tableDF(JDBCUtils.scala:86)
at com.amazonaws.services.glue.util.NoJDBCPartitioner$.tableDF(JDBCUtils.scala:172)
at com.amazonaws.services.glue.JDBCDataSource.getDynamicFrame(DataSource.scala:967)
at com.amazonaws.services.glue.DataSource.getDynamicFrame(DataSource.scala:99)
at com.amazonaws.services.glue.DataSource.getDynamicFrame$(DataSource.scala:99)
at com.amazonaws.services.glue.SparkSQLDataSource.getDynamicFrame(DataSource.scala:714)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
```
Am I supposed to be passing what looks to be a certificate manager of some sort, if so how do I do that?
I'm loading this up in a very simple way
```python
def spark_sql_query(
glueContext,
query,
mapping) -> DynamicFrame:
for alias, frame in mapping.items():
frame.toDF().createOrReplaceTempView(alias)
result = spark.sql(query)
return DynamicFrame.fromDF(result, glueContext)
def jdbc_source(
glueContext,
connectionName,
table,
) -> DynamicFrame:
connection_options = {
"useConnectionProperties": "true",
"dbtable": table,
"connectionName": connectionName,
"customJdbcDriverS3Path": "s3://my_path/mysql-connector-java-5.1.49.jar",
"customJdbcDriverClassName": "com.mysql.jdbc.Driver"
}
return glueContext.create_dynamic_frame.from_options(
connection_type='mysql',
connection_options=connection_options
)
def main():
data_source = jdbc_source(
glueContext=glueContext,
connectionName='my_connection_name',
table="my_table")
spark_sql_query(
glueContext=glueContext,
query=query,
mapping={"myDataSource": data_source}
).show()
job.commit()
```