I attempted to migrate data from a csv file from an S3 storage to a table in my Redshift cluster. I took reference from an autogenerated code which came after I built blocks using Visual mode in AWS Glue. This job ran successfully.
I copied that code and changed basic details such as variable and table name in the Spark script I wanted to run under the different job. FYI, I gave all necessary permissions and attached required policies for the job to run and have set the VPC and security group rules.
Here's my code given below.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node S3 bucket
node1 = glueContext.create_dynamic_frame.from_options(
format_options={"quoteChar": '"', "withHeader": True, "separator": ","},
connection_type="s3",
format="csv",
connection_options={"paths": ["s3://testbucket-mohit/Source/AMH.csv"]},
transformation_ctx="S3bucket_node1",
)
# Script generated for node Amazon Redshift
node2 = glueContext.write_dynamic_frame.from_options(
frame=node1,
connection_type="redshift",
connection_options={
"redshiftTmpDir": "s3://aws-glue-assets-765579251507-ap-south-1/temporary/",
"useConnectionProperties": "true",
"dbtable": "public.amh2",
"connectionName": "redshift",
"preactions": "DROP TABLE IF EXISTS public.amh2; CREATE TABLE IF NOT EXISTS public.amh2 (id VARCHAR, owner_name VARCHAR, property_name VARCHAR, address_line1 VARCHAR, city VARCHAR, state VARCHAR, zipcode VARCHAR, country VARCHAR, square_feet VARCHAR, property_type VARCHAR, year_built VARCHAR, url VARCHAR, cityurl VARCHAR);",
},
transformation_ctx="node2",
)
job.commit()
The data successfully loads into dynamic frame however when it comes to writing data to redshift, it throws below error.
23/10/10 09:07:56 INFO LogPusher: stopping
23/10/10 09:07:56 INFO ProcessLauncher: postprocessing
23/10/10 09:07:56 ERROR ProcessLauncher: Error from Python:Traceback (most recent call last):
File "/tmp/testrun.py", line 26, in <module>
node2 = glueContext.write_dynamic_frame.from_options(
File "/opt/amazon/lib/python3.7/site-packages/awsglue/dynamicframe.py", line 640, in from_options
return self._glue_context.write_dynamic_frame_from_options(frame,
File "/opt/amazon/lib/python3.7/site-packages/awsglue/context.py", line 337, in write_dynamic_frame_from_options
return self.write_from_options(frame, connection_type,
File "/opt/amazon/lib/python3.7/site-packages/awsglue/context.py", line 355, in write_from_options
sink = self.getSink(connection_type, format, transformation_ctx, **new_options)
File "/opt/amazon/lib/python3.7/site-packages/awsglue/context.py", line 317, in getSink
j_sink = self._ssql_ctx.getSink(connection_type,
File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
return f(*a, **kw)
File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o86.getSink.
: java.sql.SQLException: The connection attempt failed.
at com.amazon.redshift.util.RedshiftException.getSQLException(RedshiftException.java:56)
at com.amazon.redshift.Driver.connect(Driver.java:319)
at com.amazonaws.services.glue.util.JDBCWrapper$.$anonfun$connectionProperties$5(JDBCUtils.scala:1061)
at com.amazonaws.services.glue.util.JDBCWrapper$.$anonfun$connectWithSSLAttempt$2(JDBCUtils.scala:1012)
at scala.Option.getOrElse(Option.scala:189)
at com.amazonaws.services.glue.util.JDBCWrapper$.$anonfun$connectWithSSLAttempt$1(JDBCUtils.scala:1012)
at scala.Option.getOrElse(Option.scala:189)
at com.amazonaws.services.glue.util.JDBCWrapper$.connectWithSSLAttempt(JDBCUtils.scala:1012)
at com.amazonaws.services.glue.util.JDBCWrapper$.connectionProperties(JDBCUtils.scala:1057)
at com.amazonaws.services.glue.util.JDBCWrapper.connectionProperties$lzycompute(JDBCUtils.scala:820)
at com.amazonaws.services.glue.util.JDBCWrapper.connectionProperties(JDBCUtils.scala:820)
at com.amazonaws.services.glue.util.JDBCWrapper.getRawConnection(JDBCUtils.scala:833)
at com.amazonaws.services.glue.RedshiftDataSink.<init>(RedshiftDataSink.scala:39)
at com.amazonaws.services.glue.GlueContext.getSink(GlueContext.scala:1121)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.net.SocketTimeoutException: connect timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at com.amazon.redshift.core.RedshiftStream.<init>(RedshiftStream.java:86)
at com.amazon.redshift.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:111)
at com.amazon.redshift.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:224)
at com.amazon.redshift.core.ConnectionFactory.openConnection(ConnectionFactory.java:51)
at com.amazon.redshift.jdbc.RedshiftConnectionImpl.<init>(RedshiftConnectionImpl.java:328)
at com.amazon.redshift.Driver.makeConnection(Driver.java:474)
at com.amazon.redshift.Driver.connect(Driver.java:295)
... 24 more
Even I tried to run below cell block in AWS Glue's Jupyter Notebook, still it gives below error.
node2 = glueContext.write_dynamic_frame.from_options(
frame=node1,
connection_type="redshift",
connection_options={
"redshiftTmpDir": "s3://aws-glue-assets-765579251507-ap-south-1/temporary/",
"useConnectionProperties": "true",
"dbtable": "public.amh2",
"connectionName": "redshift",
"preactions": "DROP TABLE IF EXISTS public.amh2; CREATE TABLE IF NOT EXISTS public.amh2 (id VARCHAR, owner_name VARCHAR, property_name VARCHAR, address_line1 VARCHAR, city VARCHAR, state VARCHAR, zipcode VARCHAR, country VARCHAR, square_feet VARCHAR, property_type VARCHAR, year_built VARCHAR, url VARCHAR, cityurl VARCHAR);",
},
transformation_ctx="node2",
)
Am I missing something here? Thank you in advance.
Thanks. I am able to fix my silly issue. In my case, the IAM permissions missing. This doc helps https://docs.aws.amazon.com/redshift/latest/mgmt/redshift-iam-authentication-access-control.html#redshift-iam-accesscontrol
No, the problem is not with creating dynamic frame but with writing dynamic frame to redshift table. S3 directory for temporary storage has already been provided. IAM permissions and communication between all 3 had been already established since ETL job runs successfully built visually and not via script.
This error comes up when I create job via spark script and copy template that was auto-generated during the successful ETL job which was built visually using blocks. Even tried same with Jupyter Notebook, however error still persists.
Hmm, interesting. Did you check the network configurations set at redshift cluster. If SG, Routing in private subnet allows connection to redshift. I think you have issue only when writing to redshift.