Ongoing service disruptions
For the most recent update on ongoing service disruptions affecting the AWS Middle East (UAE) Region (ME-CENTRAL-1), refer to the AWS Health Dashboard. For information on AWS Service migration, see How do I migrate my services to another region?
如何自動將資料同步到 Amazon Bedrock?
我希望自動化 Amazon Bedrock 知識庫的資料同步功能。
簡短描述
使用以檢索增強生成 (RAG) 為基礎方法來開發 AI 應用程式的組織,必須確保其知識庫與資料保持同步。若要自動執行資料更新,您可以使用 StartIngestionJob API。
先決條件:
- 具有適當權限的 AWS 帳戶。
- 熟悉您偏好的程式語言所對應的 AWS SDK。
**注意:**如果您在執行 AWS Command Line Interface (AWS CLI) 命令時收到錯誤,請參閱AWS CLI 錯誤疑難排解。此外,請確定您使用的是最新的 AWS CLI 版本。
解決方法
使用 IngestionJob API
請完成下列步驟:
-
請為您偏好的程式語言設定 AWS SDK。或者,如果您使用 AWS CLI,則使用您的憑證來設定 AWS CLI。
-
若要尋找您的知識庫 ID,請執行 list-knowledge-bases AWS CLI 命令:
aws bedrock-agent list-knowledge-bases --region your-region-name**注意:**將 your-region-name 替換為您的 AWS 區域。
-
若要尋找資料來源 ID,請執行 list-data-sources AWS CLI 命令:
aws bedrock-agent list-data-sources --knowledge-base-id your-knowledge-base-id --region your-region-name**注意:**將 your-region-name 替換為您的 AWS 區域,並將 your-knowledge-base-id 替換為您的知識庫 ID。
-
執行 StartIngestionJob API:
SDK_BedrockAgent_Client.StartIngestionJob( --knowledge-base-id your-knowledge-base-id --data-source-id your-data-source-id)**注意:**API 可能會因您所使用的程式語言而有所不同。或者,如果您使用 AWS CLI,API 也可能會有所不同。以下是使用 Python 和 boto3 的範例:
import boto3from botocore.exceptions import ClientError def start_ingestion_job(knowledge_base_id, data_source_id): bedrock = boto3.client('bedrock-agent', region_name='your-region') try: response = bedrock.start_ingestion_job( knowledgeBaseId=knowledge_base_id, dataSourceId=data_source_id ) return response except ClientError as e: print(f"An error occurred: {e}") return None --# Usage knowledge_base_id = 'your-knowledge-base-id' data_source_id = 'your-data-source-id' job_response = start_ingestion_job(knowledge_base_id, data_source_id) if job_response: print(f"Ingestion job started successfully. Job ID: {job_response['ingestionJob']['ingestionJobId']}") else: print("Failed to start ingestion job.") -
從輸出中,記下 ingestionJobId。
-
若要檢查擷取作業的狀態,請執行 GetIngestionJob API:
SDK_BedrockAgent_Client.GetIngestionJob( --knowledge-base-id your-knowledge-base-id --data-source-id your-data-source-id --ingestion-job-id your-ingestion-job-id)**注意:**API 可能會因您所使用的程式語言而有所不同。或者,如果您使用 AWS CLI,API 也可能會有所不同。以下是使用 Python 和 boto3 的範例:
def check_ingestion_job_status(knowledge_base_id, data_source_id, ingestion_job_id): bedrock = boto3.client('bedrock-agent', region_name='your-region') try: response = bedrock.get_ingestion_job( knowledgeBaseId=knowledge_base_id, dataSourceId = data_source_id, ingestionJobId=ingestion_job_id ) return response['ingestionJob']['status'] except ClientError as e: print(f"An error occurred: {e}") return None --# Usage if job_response: status = check_ingestion_job_status(knowledge_base_id, data_source_id, ingestion_job_id) print(f"Current ingestion job status: {status}")
使用偽程式碼將您的資料推送到知識庫
使用偽程式碼從所有可用的資料來源更新您的知識庫資料。
範例:
Function StartJob(knowledgeBaseId, dataSourceId) Try job = BedrockAgentService.StartIngestionJob(knowledgeBaseId, dataSourceId) Return job Catch Error LogError("Failed to start ingestion job for data source: " + dataSourceId) Return null Function GetIngestionJobStatus(knowledgeBaseId, dataSourceId, ingestionJobId) Try jobStatus = BedrockAgentService.GetIngestionJob(knowledgeBaseId, dataSourceId, ingestionJobId) Return jobStatus Catch Error LogError("Failed to get status for job: " + ingestionJobId) Return null Function RunIngestionJobs(knowledgeBaseId) dataSources = BedrockAgentService.ListDataSources(knowledgeBaseId) For Each dataSource in dataSources job = StartJob(knowledgeBaseId, dataSource.Id) If job is not null Then LogInfo("Job started successfully for data source: " + dataSource.Id) While job.Status is not (Completed or Failed or Stopped) Wait for short interval job = GetIngestionJobStatus(knowledgeBaseId, dataSource.Id, job.Id) If job is null Then Break While loop If job is not null Then LogInfo("Job completed with status: " + job.Status) Else LogError("Job monitoring failed for data source: " + dataSource.Id) Else LogError("Failed to start job for data source: " + dataSource.Id) Main knowledgeBaseId = "<your-knowledge-base-id" RunIngestionJobs(knowledgeBaseId)
程式碼定義了三個主要函式:
- StartJob 函式使用 StartIngestionJob API,為您提供的知識庫和資料來源啟動擷取作業。
- GetIngestionJobStatus 函式會取得您所提供擷取作業的目前狀態。
- RunIngestionJobs 函式會啟動並監控您所提供知識庫中,各資料來源的擷取作業。
**注意:**如果作業失敗,請查看錯誤訊息以了解更多詳細資訊。
遵循最佳實務
為了減少將資料同步到 Amazon Bedrock 時可能遇到的問題,請完成以下最佳實務:
- 為了管理 API 呼叫時可能出現的問題,請在整個程序中實作錯誤處理。
- 若要定期檢查作業的狀態,請使用輪詢機制。如需詳細資訊,請參閱使用 Lambda 和 AWS Batch 輪詢作業狀態。
- 維護詳細的擷取程序日誌,以利疑難排解與稽核。
- 遵循 AWS 安全最佳實務。
- 審查資料擷取與儲存成本。
- 語言
- 中文 (繁體)

相關內容
- 已提問 8 個月前
- 已提問 2 年前
