如何从 PySpark 将数据写入 Kinesis 流?

0

【以下的问题经过翻译处理】 客户希望使用 PySpark 在 Spark Streaming 中处理流数据,并希望将结果输出到 Kinesis 流

虽然 PySpark 确实支持从 Kinesis (http://spark.apache.org/docs/latest/streaming-kinesis-integration.html) 读取数据,但我看不到对将数据写入 Kinesis 的任何支持。

DataBricks 有一些用于为 Spark (scala) 创建 Kinesis sink的文档 (https://docs.databricks.com/spark/latest/structured-streaming/kinesis.html),但如果我正确理解文档,这是基于关于 “ForeachSink”,这在PySpark 是不支持的(http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach

The foreach operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface ForeachWriter (Scala/Java docs), which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.

有没有人遇到过实现这一目标的方法?

profile picture
전문가
질문됨 6달 전16회 조회
1개 답변
0

【以下的回答经过翻译处理】 在 PySpark 中,您可以使用 forEachPartition 并为该分区调用 Kinesis 或任何外部 API,或者您也可以使用 map 并按记录调用 Kinesis。

# 方法 1:每个分区

def pushToKinesis(迭代器):
   打印(列表(迭代器)[0]
   #push to kinesis 使用 boto3 APIs

rdd.foreachPartition(pushToKinesis())

# 方法二:每条记录

def pushToKinesis(记录):
   #push to kinesis 使用 boto3 APIs

rdd.map(lambda l: pushToKinesis(l))

下面的博文使用方法 2 调用 Amazon Comprehend: https://aws.amazon.com/blogs/machine-learning/how-to-scale-sentiment-analysis-using-amazon-comprehend-aws-glue-and-amazon-athena/

profile picture
전문가
답변함 6달 전

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인