跳至內容

如何自動將資料同步到 Amazon Bedrock?

3 分的閱讀內容
0

我希望自動化 Amazon Bedrock 知識庫的資料同步功能。

簡短描述

使用以檢索增強生成 (RAG) 為基礎方法來開發 AI 應用程式的組織,必須確保其知識庫與資料保持同步。若要自動執行資料更新,您可以使用 StartIngestionJob API。

先決條件:

  • 具有適當權限的 AWS 帳戶。
  • 熟悉您偏好的程式語言所對應的 AWS SDK

**注意:**如果您在執行 AWS Command Line Interface (AWS CLI) 命令時收到錯誤,請參閱AWS CLI 錯誤疑難排解。此外,請確定您使用的是最新的 AWS CLI 版本

解決方法

使用 IngestionJob API

請完成下列步驟:

  1. 請為您偏好的程式語言設定 AWS SDK。或者,如果您使用 AWS CLI,則使用您的憑證來設定 AWS CLI

  2. 若要尋找您的知識庫 ID,請執行 list-knowledge-bases AWS CLI 命令:

    aws bedrock-agent list-knowledge-bases --region your-region-name

    **注意:**將 your-region-name 替換為您的 AWS 區域。

  3. 若要尋找資料來源 ID,請執行 list-data-sources AWS CLI 命令:

    aws bedrock-agent list-data-sources --knowledge-base-id your-knowledge-base-id --region your-region-name

    **注意:**將 your-region-name 替換為您的 AWS 區域,並將 your-knowledge-base-id 替換為您的知識庫 ID。

  4. 執行 StartIngestionJob API:

    SDK_BedrockAgent_Client.StartIngestionJob(
      --knowledge-base-id your-knowledge-base-id
      --data-source-id your-data-source-id)

    **注意:**API 可能會因您所使用的程式語言而有所不同。或者,如果您使用 AWS CLI,API 也可能會有所不同。以下是使用 Python 和 boto3 的範例:

    import boto3from botocore.exceptions import ClientError
    def start_ingestion_job(knowledge_base_id, data_source_id):
        bedrock = boto3.client('bedrock-agent', region_name='your-region')
        try:
            response = bedrock.start_ingestion_job(
                knowledgeBaseId=knowledge_base_id,
                dataSourceId=data_source_id        )
            return response    except ClientError as e:
            print(f"An error occurred: {e}")
            return None
    
    --# Usage
    knowledge_base_id = 'your-knowledge-base-id'
    data_source_id = 'your-data-source-id'
    job_response = start_ingestion_job(knowledge_base_id, data_source_id)
    
    if job_response:
        print(f"Ingestion job started successfully. Job ID: {job_response['ingestionJob']['ingestionJobId']}")
    else:
        print("Failed to start ingestion job.")
  5. 從輸出中,記下 ingestionJobId

  6. 若要檢查擷取作業的狀態,請執行 GetIngestionJob API:

    SDK_BedrockAgent_Client.GetIngestionJob(
      --knowledge-base-id your-knowledge-base-id
      --data-source-id your-data-source-id
      --ingestion-job-id your-ingestion-job-id)

    **注意:**API 可能會因您所使用的程式語言而有所不同。或者,如果您使用 AWS CLI,API 也可能會有所不同。以下是使用 Python 和 boto3 的範例:

    def check_ingestion_job_status(knowledge_base_id, data_source_id, ingestion_job_id):
        bedrock = boto3.client('bedrock-agent', region_name='your-region')
        try:
            response = bedrock.get_ingestion_job(
                knowledgeBaseId=knowledge_base_id,
                dataSourceId = data_source_id,
                ingestionJobId=ingestion_job_id        )
            return response['ingestionJob']['status']
        except ClientError as e:
            print(f"An error occurred: {e}")        return None
    
    --# Usage
    if job_response:
        status = check_ingestion_job_status(knowledge_base_id, data_source_id, ingestion_job_id)
        print(f"Current ingestion job status: {status}")

使用偽程式碼將您的資料推送到知識庫

使用偽程式碼從所有可用的資料來源更新您的知識庫資料。

範例:

Function StartJob(knowledgeBaseId, dataSourceId)
    Try        job = BedrockAgentService.StartIngestionJob(knowledgeBaseId, dataSourceId)
        Return job
    Catch Error        LogError("Failed to start ingestion job for data source: " + dataSourceId)
        Return null
Function GetIngestionJobStatus(knowledgeBaseId, dataSourceId, ingestionJobId)
    Try        jobStatus = BedrockAgentService.GetIngestionJob(knowledgeBaseId, dataSourceId, ingestionJobId)
        Return jobStatus
    Catch Error        LogError("Failed to get status for job: " + ingestionJobId)
        Return null
Function RunIngestionJobs(knowledgeBaseId)
    dataSources = BedrockAgentService.ListDataSources(knowledgeBaseId)

    For Each dataSource in dataSources        job = StartJob(knowledgeBaseId, dataSource.Id)

        If job is not null Then            LogInfo("Job started successfully for data source: " + dataSource.Id)

            While job.Status is not (Completed or Failed or Stopped)
                Wait for short interval                job = GetIngestionJobStatus(knowledgeBaseId, dataSource.Id, job.Id)

                If job is null Then                    Break While loop
                        If job is not null Then                LogInfo("Job completed with status: " + job.Status)
            Else                LogError("Job monitoring failed for data source: " + dataSource.Id)
        Else            LogError("Failed to start job for data source: " + dataSource.Id)

Main    knowledgeBaseId = "<your-knowledge-base-id"
    RunIngestionJobs(knowledgeBaseId)

程式碼定義了三個主要函式:

  • StartJob 函式使用 StartIngestionJob API,為您提供的知識庫和資料來源啟動擷取作業。
  • GetIngestionJobStatus 函式會取得您所提供擷取作業的目前狀態。
  • RunIngestionJobs 函式會啟動並監控您所提供知識庫中,各資料來源的擷取作業。

**注意:**如果作業失敗,請查看錯誤訊息以了解更多詳細資訊。

遵循最佳實務

為了減少將資料同步到 Amazon Bedrock 時可能遇到的問題,請完成以下最佳實務: