如何自动将我的数据同步到 Amazon Bedrock?

3 分钟阅读
0

我想自动同步我的 Amazon Bedrock 知识库的数据。

简短描述

在其 AI 应用程序中使用基于检索增强生成 (RAG) 的方法的组织必须使其知识库与数据保持同步。要自动更新数据,您可以使用 StartIngestionJob API。

先决条件:

  • 具有适当权限的 AWS 账户。
  • 熟悉适用于您的首选编程语言的 AWS SDK

**注意:**如果在运行 AWS 命令行界面 (AWS CLI) 命令时收到错误,请参阅 AWS CLI 错误故障排除。此外,请确保您使用的是最新的 AWS CLI 版本

解决方法

使用 IngestionJob API

完成以下步骤:

  1. 为您的首选编程语言配置 AWS SDK。或者,如果您使用 AWS CLI,请使用您的凭证配置 AWS CLI

  2. 要查找您的知识库 ID,请运行 list-knowledge-bases AWS CLI 命令:

    aws bedrock-agent list-knowledge-bases --region your-region-name

    **注意:**将 your-region-name 替换为您的 AWS 区域。

  3. 要查找您的数据源 ID,请运行 list-data-sources AWS CLI 命令:

    aws bedrock-agent list-data-sources --knowledge-base-id your-knowledge-base-id --region your-region-name

    **注意:**将 your-region-name 替换为您的 AWS 区域,将 your-knowledge-base-id 替换为您的知识库 ID。

  4. 运行 StartIngestionJob API:

    SDK_BedrockAgent_Client.StartIngestionJob(
      --knowledge-base-id your-knowledge-base-id
      --data-source-id your-data-source-id)

    **注意:**根据您使用的程序语言,API 的外观可能会有所不同。或者,如果您使用 AWS CLI,情况可能会有所不同。以下是使用 Python 和 boto3 的示例:

    import boto3from botocore.exceptions import ClientError
    def start_ingestion_job(knowledge_base_id, data_source_id):
        bedrock = boto3.client('bedrock-agent', region_name='your-region')
        try:
            response = bedrock.start_ingestion_job(
                knowledgeBaseId=knowledge_base_id,
                dataSourceId=data_source_id        )
            return response    except ClientError as e:
            print(f"An error occurred: {e}")
            return None
    
    --# Usage
    knowledge_base_id = 'your-knowledge-base-id'
    data_source_id = 'your-data-source-id'
    job_response = start_ingestion_job(knowledge_base_id, data_source_id)
    
    if job_response:
        print(f"Ingestion job started successfully. Job ID: {job_response['ingestionJob']['ingestionJobId']}")
    else:
        print("Failed to start ingestion job.")
  5. 在输出中,记下 ingestionJobId

  6. 要检查摄取作业的状态,请运行 GetIngestionJob API:

    SDK_BedrockAgent_Client.GetIngestionJob(
      --knowledge-base-id your-knowledge-base-id
      --data-source-id your-data-source-id
      --ingestion-job-id your-ingestion-job-id)

    **注意:**根据您使用的程序语言,API 的外观可能会有所不同。或者,如果您使用 AWS CLI,情况可能会有所不同。以下是使用 Python 和 boto3 的示例:

    def check_ingestion_job_status(knowledge_base_id, data_source_id, ingestion_job_id):
        bedrock = boto3.client('bedrock-agent', region_name='your-region')
        try:
            response = bedrock.get_ingestion_job(
                knowledgeBaseId=knowledge_base_id,
                dataSourceId = data_source_id,
                ingestionJobId=ingestion_job_id        )
            return response['ingestionJob']['status']
        except ClientError as e:
            print(f"An error occurred: {e}")        return None
    
    --# Usage
    if job_response:
        status = check_ingestion_job_status(knowledge_base_id, data_source_id, ingestion_job_id)
        print(f"Current ingestion job status: {status}")

使用伪代码将数据推送到知识库

使用伪代码更新知识库中所有可用数据源的数据。

示例:

Function StartJob(knowledgeBaseId, dataSourceId)
    Try        job = BedrockAgentService.StartIngestionJob(knowledgeBaseId, dataSourceId)
        Return job
    Catch Error        LogError("Failed to start ingestion job for data source: " + dataSourceId)
        Return null
Function GetIngestionJobStatus(knowledgeBaseId, dataSourceId, ingestionJobId)
    Try        jobStatus = BedrockAgentService.GetIngestionJob(knowledgeBaseId, dataSourceId, ingestionJobId)
        Return jobStatus
    Catch Error        LogError("Failed to get status for job: " + ingestionJobId)
        Return null
Function RunIngestionJobs(knowledgeBaseId)
    dataSources = BedrockAgentService.ListDataSources(knowledgeBaseId)

    For Each dataSource in dataSources        job = StartJob(knowledgeBaseId, dataSource.Id)

        If job is not null Then            LogInfo("Job started successfully for data source: " + dataSource.Id)

            While job.Status is not (Completed or Failed or Stopped)
                Wait for short interval                job = GetIngestionJobStatus(knowledgeBaseId, dataSource.Id, job.Id)

                If job is null Then                    Break While loop
                        If job is not null Then                LogInfo("Job completed with status: " + job.Status)
            Else                LogError("Job monitoring failed for data source: " + dataSource.Id)
        Else            LogError("Failed to start job for data source: " + dataSource.Id)

Main    knowledgeBaseId = "<your-knowledge-base-id"
    RunIngestionJobs(knowledgeBaseId)

该代码定义了三个主要函数:

  • StartJob 函数使用 StartIngestionJob API 为您提供的知识库和数据源启动摄取作业。
  • GetIngestionJobStatus 函数获取您提供的摄取作业的当前状态。
  • RunIngestionJobs 函数启动并监控您提供的知识库中数据源的摄取作业。

**注意:**如果操作失败,请查看您的错误消息以获取更多详细信息。

遵循最佳实践

为了减少将数据同步到 Amazon Bedrock 时出现的问题,请完成以下最佳实践:

AWS 官方
AWS 官方已更新 2 个月前