New user sign up using AWS Builder ID
New user sign up using AWS Builder ID is currently unavailable on re:Post. To sign up, please use the AWS Management Console instead.
如何自动将我的数据同步到 Amazon Bedrock?
我想自动同步我的 Amazon Bedrock 知识库的数据。
简短描述
在其 AI 应用程序中使用基于检索增强生成 (RAG) 的方法的组织必须使其知识库与数据保持同步。要自动更新数据,您可以使用 StartIngestionJob API。
先决条件:
- 具有适当权限的 AWS 账户。
- 熟悉适用于您的首选编程语言的 AWS SDK。
**注意:**如果在运行 AWS 命令行界面 (AWS CLI) 命令时收到错误,请参阅 AWS CLI 错误故障排除。此外,请确保您使用的是最新的 AWS CLI 版本。
解决方法
使用 IngestionJob API
完成以下步骤:
-
为您的首选编程语言配置 AWS SDK。或者,如果您使用 AWS CLI,请使用您的凭证配置 AWS CLI。
-
要查找您的知识库 ID,请运行 list-knowledge-bases AWS CLI 命令:
aws bedrock-agent list-knowledge-bases --region your-region-name
**注意:**将 your-region-name 替换为您的 AWS 区域。
-
要查找您的数据源 ID,请运行 list-data-sources AWS CLI 命令:
aws bedrock-agent list-data-sources --knowledge-base-id your-knowledge-base-id --region your-region-name
**注意:**将 your-region-name 替换为您的 AWS 区域,将 your-knowledge-base-id 替换为您的知识库 ID。
-
运行 StartIngestionJob API:
SDK_BedrockAgent_Client.StartIngestionJob( --knowledge-base-id your-knowledge-base-id --data-source-id your-data-source-id)
**注意:**根据您使用的程序语言,API 的外观可能会有所不同。或者,如果您使用 AWS CLI,情况可能会有所不同。以下是使用 Python 和 boto3 的示例:
import boto3from botocore.exceptions import ClientError def start_ingestion_job(knowledge_base_id, data_source_id): bedrock = boto3.client('bedrock-agent', region_name='your-region') try: response = bedrock.start_ingestion_job( knowledgeBaseId=knowledge_base_id, dataSourceId=data_source_id ) return response except ClientError as e: print(f"An error occurred: {e}") return None --# Usage knowledge_base_id = 'your-knowledge-base-id' data_source_id = 'your-data-source-id' job_response = start_ingestion_job(knowledge_base_id, data_source_id) if job_response: print(f"Ingestion job started successfully. Job ID: {job_response['ingestionJob']['ingestionJobId']}") else: print("Failed to start ingestion job.")
-
在输出中,记下 ingestionJobId。
-
要检查摄取作业的状态,请运行 GetIngestionJob API:
SDK_BedrockAgent_Client.GetIngestionJob( --knowledge-base-id your-knowledge-base-id --data-source-id your-data-source-id --ingestion-job-id your-ingestion-job-id)
**注意:**根据您使用的程序语言,API 的外观可能会有所不同。或者,如果您使用 AWS CLI,情况可能会有所不同。以下是使用 Python 和 boto3 的示例:
def check_ingestion_job_status(knowledge_base_id, data_source_id, ingestion_job_id): bedrock = boto3.client('bedrock-agent', region_name='your-region') try: response = bedrock.get_ingestion_job( knowledgeBaseId=knowledge_base_id, dataSourceId = data_source_id, ingestionJobId=ingestion_job_id ) return response['ingestionJob']['status'] except ClientError as e: print(f"An error occurred: {e}") return None --# Usage if job_response: status = check_ingestion_job_status(knowledge_base_id, data_source_id, ingestion_job_id) print(f"Current ingestion job status: {status}")
使用伪代码将数据推送到知识库
使用伪代码更新知识库中所有可用数据源的数据。
示例:
Function StartJob(knowledgeBaseId, dataSourceId) Try job = BedrockAgentService.StartIngestionJob(knowledgeBaseId, dataSourceId) Return job Catch Error LogError("Failed to start ingestion job for data source: " + dataSourceId) Return null Function GetIngestionJobStatus(knowledgeBaseId, dataSourceId, ingestionJobId) Try jobStatus = BedrockAgentService.GetIngestionJob(knowledgeBaseId, dataSourceId, ingestionJobId) Return jobStatus Catch Error LogError("Failed to get status for job: " + ingestionJobId) Return null Function RunIngestionJobs(knowledgeBaseId) dataSources = BedrockAgentService.ListDataSources(knowledgeBaseId) For Each dataSource in dataSources job = StartJob(knowledgeBaseId, dataSource.Id) If job is not null Then LogInfo("Job started successfully for data source: " + dataSource.Id) While job.Status is not (Completed or Failed or Stopped) Wait for short interval job = GetIngestionJobStatus(knowledgeBaseId, dataSource.Id, job.Id) If job is null Then Break While loop If job is not null Then LogInfo("Job completed with status: " + job.Status) Else LogError("Job monitoring failed for data source: " + dataSource.Id) Else LogError("Failed to start job for data source: " + dataSource.Id) Main knowledgeBaseId = "<your-knowledge-base-id" RunIngestionJobs(knowledgeBaseId)
该代码定义了三个主要函数:
- StartJob 函数使用 StartIngestionJob API 为您提供的知识库和数据源启动摄取作业。
- GetIngestionJobStatus 函数获取您提供的摄取作业的当前状态。
- RunIngestionJobs 函数启动并监控您提供的知识库中数据源的摄取作业。
**注意:**如果操作失败,请查看您的错误消息以获取更多详细信息。
遵循最佳实践
为了减少将数据同步到 Amazon Bedrock 时出现的问题,请完成以下最佳实践:
- 要管理 API 调用问题,请在整个过程中实施错误处理。
- 要定期检查作业的状态,请使用轮询机制。有关详细信息,请参阅使用 Lambda 和 AWS Batch 轮询作业状态。
- 保留摄取过程的详细日志,以进行故障排除和审计。
- 遵循 AWS 安全最佳实践。
- 查看数据摄取和存储成本。

相关内容
- 已提问 4 个月前lg...
- AWS 官方已更新 1 年前
- AWS 官方已更新 10 个月前
- AWS 官方已更新 10 个月前
- AWS 官方已更新 3 个月前