Vector Search with Amazon Redshift

Lecture de 13 minute(s)
Niveau du contenu : Expert
0

Provide procedures and functions as well as walk through an example of how to implement Vector Search capabilities in Amazon Redshift.

Industry estimates suggest that unstructured data (things like text documents, images, videos, audio) makes up 80-90% of all data generated globally. This includes data from sources like the internet, social media, IoT devices, etc. The total volume of unstructured data is growing exponentially, with some projections indicating a 10x increase every 5 years. By 2025, the total amount of digital data created annually could reach 175 zettabytes (175 trillion gigabytes).

Vector search on unstructured data provides significant benefits. By representing data as high-dimensional vectors, vector search algorithms can efficiently find similar or related items, even in large datasets without traditional structure. This allows for powerful search, recommendation, and insight generation across text, images, audio, and other unstructured formats. The ability to rapidly identify relevant content and connections in unstructured data opens up new possibilities for understanding complex information, making discoveries, and delivering more personalized experiences to users.

Overall, vector search unlocks the value in unstructured data by surfacing relevant information and relationships that would be difficult to find through traditional search or analysis methods. In addition, it is a crucial enabler for making generative AI systems more powerful, versatile, and customized to user needs. The ability to rapidly retrieve the most relevant information greatly enhances the capabilities of language models, question-answering systems, and other AI assistants.

As of 07/2024, Amazon Redshift, while providing the ability to store massive amounts of semi-structured & unstructured data (text/varbyte/json), did not have vector search capabilities. This article shows how you can leverage services like Amazon Bedrock to:

  1. Build and trigger batch embedding jobs to represent your unstructured data as vectors.
  2. Build K-Means Clusters to optimize your search.
  3. Execute Vector Search against your data; leveraging the K-Means cluster to speed up retrieval and reduce CPU consumption.

Architecture Diagram

Once deployed, users can call a Redshift Stored Procedure to indicate the table they want to search, the string, and the number of results they want returned.

call sp_vector_search('reviews', 'bad broken unreliable slow', 100, 'searchresults');
select review_id, product_title, review_title, review_desc, similarity 
from  #searchresults 
join reviews on review_id = recordid
order by similarity desc

Results

1. Batch Embedding

Amazon Bedrock has runtime quotas on model invocations. So, for large quantities of data, you should upload your data to S3 and call the CreateModelInvocationJob API. Below we’ll use a sample data set of Amazon reviews and how you can prepare the data for batch embeddings and execute the batch embedding jobs.

Source Data

DROP TABLE reviews;
CREATE TABLE reviews(
  marketplace varchar(10),
  customer_id varchar(15), 
  review_id varchar(15), 
  product_id varchar(25) DISTKEY, 
  product_parent varchar(15), 
  product_title varchar(1000), 
  star_rating int, 
  helpful_votes int, 
  total_votes int, 
  vine varchar(5), 
  verified_purchase varchar(5), 
  review_title varchar(5000),
  review_desc varchar(max),
  review_date date, 
  year int)
  
  SORTKEY (
     review_date
    );

COPY reviews  
FROM 's3://redshift-immersionday-labs/data/amazon-reviews/'
IAM_ROLE default
FORMAT AS PARQUET;

Unload Data for Batch Embedding

Upload the content you would like to embed to S3. In this example, we’re extracting a single field but you can optionally extract multiple fields from your data into a string.

To ensure the data is ready for batch embedding, the following parameters were applied:

  1. replace - In this example, there are escaped characters and un-escaped quotes. In order to export a serializable string, data cleansing is applied to a view and then used in the unload command.
  2. enable_case_sensitive_identifier - is required as the batch embedding requires certain key names.
  3. extension - is required as the batch embedding requires jsonl files.
  4. maxfilesize - a batch embedding job can target multiple files referenced by an S3 prefix, but each file can’t exceed 50K records. For this dataset, <50K records will fit into each 15MB file.
  5. dimensions - allows you to specify the number of dimensions to use for your embedding. Options include 256, 384 &1024.
SET enable_case_sensitive_identifier TO true;

drop view embedding_view;
create view embedding_view ("recordId", "modelInput") as 
select review_id, JSON_PARSE('{"dimensions": 256, "inputText": "'+replace(replace(review_desc, '\\', '&#92'), '"', '\\"')+'"}') from reviews;

UNLOAD
('select * from embedding_view')
TO 's3://bedrock-batch-invocations/input/reviews'
IAM_ROLE DEFAULT
FORMAT JSON
PARALLEL OFF
MAXFILESIZE 15 MB
EXTENSION 'jsonl'
CLEANPATH

Batch Embedding Script

To execute a Bedrock batch invocation job, leverage the create_model_invocation_job function; referencing the modelId (amazon.titan-embed-text-v2:0), as well as an IAM role which the Amazon Bedrock service can assume to execute the job. Ensure the IAM Role has the IAM Trust Policy & IAM Permissions described below.

The below script loops through the files generated by the Amazon Redshift and triggers an invocation for each; waiting until it’s complete to start the next invocation. Replace the bucket, prefix, account and role with values for your AWS Account.

import boto3, json, sys, time
bedrock_runtime = boto3.client(service_name="bedrock-runtime", region_name="us-west-2")
bedrock = boto3.client("bedrock", region_name="us-west-2")
s3 = boto3.client('s3', region_name="us-west-2")
account = '012345678901'
role = 'bedrock-batch-invocations'
inbucket = 'bedrock-batch-invocations'
inprefix = 'input'
outbucket = 'bedrock-batch-invocations'
outprefix = 'output'
jobnum = str(int(time.time()))

def wait(response):
  r1 = bedrock.get_model_invocation_job(jobIdentifier=response["jobArn"])
  if r1['status'] != 'Completed' and r1['status'] != 'Failed':
    print('Not done, waiting: ' + response["jobArn"] + ' Status: ' + r1['status'])
    return 1
  else:
    return 0

#Delete Existing
objects = s3.list_objects_v2(Bucket=outbucket, Prefix=outprefix)
for obj in objects.get('Contents', []):
  key = obj['Key']
  s3.delete_object(Bucket=outbucket, Key=key)

#invoke per input file
output_config={"s3OutputDataConfig":{"s3Uri": "s3://"+outbucket+'/'+outprefix+"/"}}
objects = s3.list_objects_v2(Bucket=inbucket, Prefix=inprefix)
outfiles = []
for i,obj in enumerate(objects.get('Contents', [])):
  key = obj['Key']
  print("Processing: " + key)
  input_config={"s3InputDataConfig":{"s3Uri": "s3://"+inbucket+"/"+key,"s3InputFormat": "JSONL"}}
  response = bedrock.create_model_invocation_job(modelId="amazon.titan-embed-text-v2:0",roleArn="arn:aws:iam::"+account+":role/"+role,inputDataConfig=input_config,outputDataConfig=output_config,
    jobName="reviewsEmbeddingJob"+jobnum+str(i))
  jobArn = response["jobArn"]
  jobid = jobArn[jobArn.rfind('/')+1:]
  filename = key[key.rfind('/')+1:]
  outfiles.append({"url":"s3://"+outbucket+'/'+outprefix+"/"+jobid+"/"+filename+".out","mandatory":False})
  while wait(response):
    time.sleep(30)

# generate manifest file for loading
manifest = {"entries":outfiles}
s3.put_object(
  Body=json.dumps(manifest),
  Bucket=outbucket,
  Key=outprefix+'/output.manifest')

IAM Trust Policy

Add this trust policy to the role passed above, replacing the account number with your AWS Account.

{"Version": "2012-10-17",
  "Statement": [{
      "Effect": "Allow",
      "Principal": {"Service": "bedrock.amazonaws.com"},
      "Action": "sts:AssumeRole",
      "Condition": {
          "StringEquals": {"aws:SourceAccount": "012345678901"},
          "ArnEquals": {"aws:SourceArn": "arn:aws:bedrock:us-west-2:012345678901:model-invocation-job/*"}
      }
  }
]}

IAM Permissions

Add this permission block to the role passed above, replacing the S3 bucket/key with values for your AWS Account.

{"Version": "2012-10-17",
 "Statement": [
     {
         "Effect": "Allow",
         "Action": ["s3:GetObject","s3:PutObject","s3:ListBucket"],
         "Resource": [
             "arn:aws:s3:::bedrock-batch-invocations",
             "arn:aws:s3:::bedrock-batch-invocations/input",
             "arn:aws:s3:::bedrock-batch-invocations/output"
         ]
     }
 ]
}

Upload Embeddings

When the data is available in s3, load it using the COPY statement into a table with the extension _embeddings This table name will be assumed later.

drop table reviews_embeddings;
create table reviews_embeddings (recordid varchar(15), "modelOutput" SUPER);

copy reviews_embeddings from 's3://bedrock-batch-invocations/output/output.manifest' 
iam_role DEFAULT
MANIFEST
format as json 'auto ignorecase';

Query Embeddings

This is a simple example of how you can unnest the embedding into rows by index. This technique will be used when calculating the euclidean distance between two vectors as well as getting the centroid of a set of vectors.

SET enable_case_sensitive_identifier TO true;

select recordid, mo, moi 
from reviews_embeddings re, re."modelOutput".embedding mo at moi
where recordid = 'R1KUVTPJ1NU8MQ'

Similarity Between Vectors

This is an example query which demonstrates how to unnest the embedding into rows and then perform the cross-product when re-aggregating the data.

select sum(av::float*bv::float)/SQRT(sum(av::float*av::float)*sum(bv::float*bv::float)) similarity 
from reviews_embeddings a, a."modelOutput".embedding as av at avi,
 reviews_embeddings b, b."modelOutput".embedding as bv at bvi
where avi = bvi and 
a.recordid = 'R1KUVTPJ1NU8MQ' and
b.recordid = 'R1G5JW56N4R8JZ'

Similarity Between Vectors

This is an example query which demonstrates how to unnest the embedding into rows and then perform the cross-product when re-aggregating the data.

select sum(av::float*bv::float)/SQRT(sum(av::float*av::float)*sum(bv::float*bv::float)) similarity 
from reviews_embeddings a, a."modelOutput".embedding as av at avi,
 reviews_embeddings b, b."modelOutput".embedding as bv at bvi
where avi = bvi and 
a.recordid = 'R1KUVTPJ1NU8MQ' and
b.recordid = 'R1G5JW56N4R8JZ'

2. Build K-Means Clusters

K-means clustering provides several important advantages for large-scale vector search. It enables efficient indexing and retrieval by dividing the high-dimensional vector space into distinct clusters, allowing search to be narrowed down to the most relevant areas first. The cluster centroids also serve as a compressed representation, reducing dimensionality and helping combat the challenges of high-dimensional data. Additionally, k-means is a highly scalable algorithm that can handle growing datasets, and the clustering supports approximate nearest neighbor search for fast, if slightly less accurate, results.

Creating Clusters

This stored procedure will expect a <tablename>_embeddings table to exist and will create a <tablename>_kmeans table as well as a <tablename>_kmeans_clusters table which will associate the embeddings with a cluster. The purpose is that rather than doing a similarity search against every embedding (in our case ~1M records), you can first do a search against the centroid of your clusters, then do a search against the members of the best cluster. In the example below, I’m creating 100 clusters, each with 1000 members. So, instead of searching 1M records, I will search 1100 records. This stored procedure is also available in the Amazon Redshift UDFs repository.

Note: Cosine Similarity can be achieved using a python UDF and the numpy library making the below SQL easier to read. However, the following method of un-nesting the SUPER & re-aggregating was more performant.

CREATE OR REPLACE PROCEDURE sp_kmeans (tablename IN varchar, clusters IN int) AS $$
DECLARE 
    cluster_size int;
    cluster int := 1;
    similarity float;
    i int;
BEGIN
    --will error if table doesn't exist
    EXECUTE 'select * from '||tablename||'_embeddings limit 1';

    EXECUTE 'SELECT CEIL(count(1)/'||clusters ||') 
        from ' || tablename||'_embeddings' INTO cluster_size;
    
    -- create kmeans tables and choose random starting centroids
    EXECUTE 'CREATE TABLE IF NOT EXISTS ' || tablename || '_kmeans (
        cluster int, centroid SUPER, startts timestamp, endts timestamp, 
        interations int) DISTSTYLE ALL';
    EXECUTE 'TRUNCATE TABLE ' || tablename || '_kmeans';

    EXECUTE 'CREATE TABLE IF NOT EXISTS ' || tablename || '_kmeans_clusters (
        cluster int, recordid varchar(15), similarity float, rnk int) 
        DISTKEY(recordid)';
    EXECUTE 'TRUNCATE TABLE ' || tablename || '_kmeans_clusters';
    
    WHILE cluster <= clusters LOOP
        --choose a random starting centroid from the remaining embeddings
        EXECUTE 'INSERT INTO ' || tablename || '_kmeans
            SELECT '||cluster||', "modelOutput".embedding, 
                CURRENT_TIMESTAMP, NULL, NULL 
            FROM ' || tablename || '_embeddings 
            WHERE "modelOutput".embedding is not null 
            AND "recordId" not in (
                select recordid from ' || tablename || '_kmeans_clusters) LIMIT 1';
        COMMIT;
        i := 1;
        similarity := 0;
        WHILE similarity < .999 LOOP
            --get embeddings closest to centroid
            EXECUTE 'DELETE FROM ' || tablename || '_kmeans_clusters 
                where cluster = '||cluster;
            EXECUTE 'INSERT INTO ' || tablename || '_kmeans_clusters
                select * from (select *, rank() over (partition by k.cluster order by k.similarity desc) rnk from (
                    select cluster, e."recordId", sum(kv::float*ev::float)/SQRT(sum(kv::float*kv::float)*sum(ev::float*ev::float)) similarity 
                    from ' || tablename || '_kmeans k, k.centroid kv at kvi,
                        ' || tablename || '_embeddings e, e."modelOutput".embedding ev at evi 
                    where kvi = evi and k.cluster = '||cluster||'
                    AND e."recordId" not in (
                        select recordid from ' || tablename || '_kmeans_clusters)
                    group by 1,2) k
                ) r where r.rnk <= ' || cluster_size;
            COMMIT;
            -- determine new center
            EXECUTE 'DROP TABLE IF EXISTS #centroid';
            EXECUTE 'CREATE TABLE #centroid as
                SELECT JSON_PARSE(''['' || listagg(po::varchar, '','') within group (order by poi) || '']'') centroid 
                FROM (
                    select poi, avg(po::float) po
                    from ' || tablename || '_kmeans_clusters as nn, ' || tablename || '_embeddings re, re."modelOutput".embedding as po at poi
                        where nn.recordid = re."recordId" and nn.cluster = ' || cluster || '
                    group by poi) as c';
            COMMIT;
            -- determine distance from new center to old center
            EXECUTE 'SELECT sum(kv::float*mv::float)/SQRT(sum(kv::float*kv::float)*sum(mv::float*mv::float))  
                    from #centroid k, k.centroid kv at kvi,
                        ' || tablename || '_kmeans m, m.centroid mv at mvi 
                    where m.cluster = '|| cluster ||' and kvi = mvi' INTO similarity;
            COMMIT;
            EXECUTE 'UPDATE ' || tablename || '_kmeans SET centroid = (select centroid from #centroid), endts = CURRENT_TIMESTAMP, interations = '|| i ||' where cluster = ' || cluster;
            COMMIT;
            i := i+1;
            COMMIT;
        END LOOP;
        cluster := cluster+1;
    END LOOP;
END
$$ LANGUAGE plpgsql;

--USAGE
call sp_kmeans('reviews', 10)

3. Execute Vector Search

Lambda UDF for On-the-Fly embeddings

Create a Lambda UDF function to support runtime embedding for your runtime search query. Ensure the IAM Role specified and associated, is required to have the lambda:InvokeFunction permission on the lambda function you create. This LambdaUDF is also available in the Amazon Redshift UDFs repository.

Note: Amazon Bedrock has runtime quotas on model invocations (2,000 / min) so you should not use this for batch embeddings.

CREATE OR REPLACE EXTERNAL FUNCTION f_titan_embedding (varchar) RETURNS varchar(max) STABLE
LAMBDA 'f-titan-embedding-varchar' IAM_ROLE DEFAULT;

Create a python lambda function, using the following code:

import boto3, json, sys
bedrock_runtime = boto3.client(service_name="bedrock-runtime")

def generate_embeddings(text=None):
    try:
        input_data = {"inputText": text}
        response = bedrock_runtime.invoke_model(
            body=json.dumps(input_data),
            modelId="amazon.titan-embed-text-v2:0",
            accept="application/json",
            contentType="application/json"
        )
        response = response.get("body").read().decode('utf-8')
        response_json = json.loads(response)
        return response_json.get("embedding")
        
    except Exception as e:
        exc_type, exc_obj, exc_tb = sys.exc_info()
        print(exc_type, exc_tb.tb_lineno)
        print('Error: '+ str(e))
        return ''
        
def handler(event, context):
    print(event)
    redshift_response = {"success": False, "num_records": event["num_records"]}
    try:
        result = []
        for row in event["arguments"]:
            try:
                embedding = generate_embeddings(row[0])
                result.append(json.dumps(embedding))
            except Exception as e:
                print(f"Error: {e}")
                result.append(None)
        redshift_response["success"] = True
        redshift_response["results"] = result
    except Exception as e:
        redshift_response["error_msg"] = str(e)
        print('Error: '+ str(e))
        exc_type, exc_obj, exc_tb = sys.exc_info()
        print(exc_type, exc_tb.tb_lineno)
    return json.dumps(redshift_response)

The following policy should be added to the Lambda Execution role to use Amazon Bedrock.

{
    "Action": ["bedrock:InvokeModel"],
    "Resource": ["arn:aws:bedrock:us-west-2::foundation-model/amazon.titan-embed-text-v2:0"],
    "Effect": "Allow"
}

Runtime Search

Create the following stored procedure to support vector search. This stored procedure is also available in the Amazon Redshift UDFs repository. The query performs the following steps:

  1. Use the lambdaUDF to embed the search string
  2. Compare that embedding to the K-Means centroids to find the best cluster
  3. Compare the embedding to all members of the cluster to calculate the similarity to each member
  4. Load the N best matches from the cluster into a temporary table
CREATE OR REPLACE PROCEDURE sp_vector_search (tablename IN varchar, search IN varchar, cnt IN int, tmp_name IN varchar) AS $$
BEGIN
    EXECUTE 'drop table if exists #'||tmp_name;
    EXECUTE 'create table #'||tmp_name ||' (recordid varchar(100), similarity float)';
    EXECUTE 'insert into #'||tmp_name ||'
        select re."recordId", sum(rv::float*qv::float)/SQRT(sum(rv::float*rv::float)*sum(qv::float*qv::float)) esimilarity
        from (
            select k.cluster, q.q, sum(kv::float*qv::float)/SQRT(sum(kv::float*kv::float)*sum(qv::float*qv::float)) csimilarity
            from '||tablename||'_kmeans k, k.centroid kv at kvi, 
            (select JSON_PARSE(f_titan_embedding('''+search+''')) as q) q, q.q qv at qvi
            where kvi = qvi 
            group by 1,2
            qualify rank() over (order by csimilarity desc) = 1
        ) q, '||tablename||'_kmeans_clusters c, '||tablename||'_embeddings re, q.q qv at qvi, re."modelOutput".embedding rv at rvi 
        where rvi = qvi and c.cluster = q.cluster and c.recordid = re."recordId"
        group by 1
        qualify rank() over (order by esimilarity desc) <= '||cnt;
END $$ LANGUAGE plpgsql;

Call the stored procedure passing in the table you want to search, the search string, the number of results you want returned and the temp table to store the results. Once complete, you can use the results to join back to the original data set to return the contextual information.

call sp_vector_search('reviews', 'bad broken unreliable slow', 100, 'searchresults');
select review_id, product_title, review_title, review_desc, similarity 
from  #searchresults 
join reviews on review_id = recordid
order by similarity desc

Results

2 Commentaires

Redshift is designed for structured data, while vector search primarily deals with unstructured data like images, text, and audio. This inherent mismatch can lead to performance and efficiency issues. visit: https://thecapsapk.com/

a répondu il y a 2 mois

While Amazon Redshift is primarily known for its data warehousing capabilities, its recent advancements in handling semi-structured and unstructured data have opened doors for innovative applications. Vector search, a technique used to find similar items in a high-dimensional space, is one such application that could benefit from Redshift's integration. visit: https://wendys-menu-with-prices.com/

a répondu il y a 9 jours