【以下的问题经过翻译处理】 你好!
我正在使用Feature Store Spark 连接器 将数据加载到 Sagemaker Feature Store中。当我们尝试将数据加载到启用在线存储的 Feature Group 时,数据会重复。在下图中,“customer_id” 是 ID 特征,“date_ref” 是事件列。对于相同的 ID 和 EventTime 列,所有特征都相同,除了 “api_invocation_time” 之外。
如果 Feature Group 没有启用在线存储,我们直接将数据加载到离线存储中,不会出现问题。但是,当我们在连接器中使用 “Ingest by default” 选项(未指定连接器中的“target_stores”,使用 PutRecord API),加载的数据会重复:
params = {
"input_data_frame":dataframe,
"feature_group_arn": feature_group_arn
}
if not online_store_enabled:
params["target_stores"] = ["OfflineStore"]
logger.info(f"Ingesting data to the offline store")
pyspark_connector.ingest_data(**params)
logger.info("Finished the ingestion!")
failed_records = pyspark_connector.get_failed_stream_ingestion_data_frame()
如何使用连接器解决这个问题?
编辑:
显然,“get_failed_stream_ingestion_data_frame”方法存在问题。该方法在返回失败记录之前会再次加载数据。从摄取管道中删除该方法可以解决此问题,但我们会失去一种验证形式。