如何在 EMR 中使用 Spark 连接到 Redshift 集群?
我想要在 Amazon EMR 集群中使用 Apache Spark 连接到 Amazon Redshift 集群。
解决方法
**注意:**配置 Redshift 集群和 EMR 集群,并安装 Spark 服务,然后再继续执行后续步骤。
测试 EMR 集群到 Redshift 集群的连接。
1. 验证 TCP 端口 5439 的 Redshift 安全组(入站规则)中是否允许 EMR 主节点、核心节点和任务节点安全组。如果在两个不同的 Amazon Virtual Private Cloud(Amazon VPC)中部署 EMR 和 Redshift 集群,请配置 VPC 对等连接。
2. 使用 SSH 连接到 EMR 主节点,然后运行以下 Telnet 命令。此 Telnet 命令可以验证您是否可在 EMR 集群与 Redshift 集群之间建立连接。在以下命令中,将 Redshift_Endpoint 替换为 Redshift 集群的正确端点。
telnet Redshift_Endpoint 5439
以下是成功连接的示例输出:
telnet redshift-cluster-1.XXXX.us-east-1.redshift.amazonaws.com 5439 Trying 172.31.48.21... Connected to redshift-cluster-1.XXXXX.us-east-1.redshift.amazonaws.com. Escape character is
在 EMR-5.x.x 系列集群中使用 Spark 连接到 Redshift 集群
使用 Databrick 的 spark-redshift 包(库)。该库会将数据从 Amazon Redshift 加载到 Spark SQL DataFrames,并且还会将 DataFrames 保存回 Amazon Redshift 表。
2. 要使用 spark-redshift 库,请将以下 .jar 文件下载到 EMR 集群:
wget https://repo1.maven.org/maven2/com/databricks/spark-redshift_2.11/2.0.1/spark-redshift_2.11-2.0.1.jar wget https://github.com/ralfstx/minimal-json/releases/download/0.9.4/minimal-json-0.9.4.jar
3. 将下载的 JAR 文件复制到默认 Spark 库。Spark 库的路径为 /usr/lib/spark/jars/。
sudo cp spark-redshift_2.11-2.0.1.jar /usr/lib/spark/jars/ sudo cp minimal-json-0.9.4.jar /usr/lib/spark/jars/
4. 使用 Amazon Redshift JDBC 驱动程序运行 spark-shell 命令,以连接到 Redshift 集群。Amazon EMR 版本 4.7.0 及更高版本中包含 JDBC 驱动程序。
spark-shell --jars /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar
5. 初始化 spark-shell 会话之后,运行以下步骤以连接到 Redshift 集群。在以下命令中,根据您的使用案例更新 Amazon Redshift 端点、Amazon Simple Storage Service(Amazon S3)存储桶名称和表详细信息。
import com.amazonaws.auth.AWSCredentialsProvider import com.amazonaws.auth.AWSSessionCredentials import com.amazonaws.auth.InstanceProfileCredentialsProvider // Instance Profile for authentication to AWS resources val provider = new InstanceProfileCredentialsProvider(); val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials]; val token = credentials.getSessionToken; val awsAccessKey = credentials.getAWSAccessKeyId; val awsSecretKey = credentials.getAWSSecretKey // Set JDBC URL of Redshift val jdbcUrl = "jdbc:redshift://<cluster-name>.<id>.<region>.redshift.amazonaws.com:5439/<database>?user=<user>&password=<password>" // Create DataFrame by loading Redshift query val df = spark.read.format("com.databricks.spark.redshift").option("url", jdbcUrl).option("tempdir", "s3://<S3-path-to-store-temp-data>").option("query", "select * from <table-name>").option("temporary_aws_access_key_id", awsAccessKey).option("temporary_aws_secret_access_key", awsSecretKey).option("temporary_aws_session_token", token).load() df.show(2)
在 Amazon EMR-6.x.x 系列集群中使用 Spark 连接到 Redshift 集群
Amazon EMR 版本 6.x 及更高版本使用 Scala 版本 2.12。Amazon EMR 5.x 使用 Scala 版本 2.11。Amazon EMR 5.x 使用的 spark-redshift_2.11-2.0.1.jar 文件与 Amazon EMR 版本 6.x 及更高版本不兼容。因此,在 Amazon EMR 6.x 及更高版本的集群中,请使用 spark-redshift_2.12-4.2.0.jar 连接器。
2. 要使用 spark-redshift 库,请将以下 .jar 文件下载到 EMR 集群:
wget https://repo1.maven.org/maven2/io/github/spark-redshift-community/spark-redshift_2.12/4.2.0/spark-redshift_2.12-4.2.0.jar wget https://github.com/ralfstx/minimal-json/releases/download/0.9.4/minimal-json-0.9.4.jar
3. 将下载的 JAR 文件复制到默认 Spark 库。Spark 库的路径为 /usr/lib/spark/jars/。
sudo cp spark-redshift_2.12-4.2.0.jar /usr/lib/spark/jars/ sudo cp minimal-json-0.9.4.jar /usr/lib/spark/jars/
4. 使用 Amazon Redshift JDBC 驱动程序运行 spark-shell 命令,以连接到 Redshift 集群。EMR 版本 4.7.0 及更高版本中包含 JDBC 驱动程序。
spark-shell --jars /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar
5. 初始化 spark-shell 会话之后,运行以下步骤以连接到 Redshift 集群。在以下命令中,根据您的使用案例更新 Amazon Redshift 端点、S3 存储桶名称和表详细信息。
import com.amazonaws.auth.AWSCredentialsProvider import com.amazonaws.auth.AWSSessionCredentials import com.amazonaws.auth.InstanceProfileCredentialsProvider // Instance Profile for authentication to AWS resources val provider = new InstanceProfileCredentialsProvider(); val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials]; val token = credentials.getSessionToken; val awsAccessKey = credentials.getAWSAccessKeyId; val awsSecretKey = credentials.getAWSSecretKey // Set JDBC URL of Redshift val jdbcUrl = "jdbc:redshift://<cluster-name>.<id>.<region>.redshift.amazonaws.com:5439/<database>?user=<user>&password=<password>" // Create DataFrame by loading Redshift query val df = spark.read.format("io.github.spark_redshift_community.spark.redshift").option("url", jdbcUrl).option("tempdir", "s3://bucket/tmp/").option("query", "select * from <table>").option("temporary_aws_access_key_id", awsAccessKey).option("temporary_aws_secret_access_key", awsSecretKey).option("temporary_aws_session_token", token).load() df.show(2)
相关内容
- AWS 官方已更新 10 个月前
- AWS 官方已更新 2 年前
- AWS 官方已更新 2 年前
- AWS 官方已更新 1 年前