Vector Search with Amazon Redshift
Provide procedures and functions as well as walk through an example of how to implement Vector Search capabilities in Amazon Redshift.
Industry estimates suggest that unstructured data (things like text documents, images, videos, audio) makes up 80-90% of all data generated globally. This includes data from sources like the internet, social media, IoT devices, etc. The total volume of unstructured data is growing exponentially, with some projections indicating a 10x increase every 5 years. By 2025, the total amount of digital data created annually could reach 175 zettabytes (175 trillion gigabytes).
Vector search on unstructured data provides significant benefits. By representing data as high-dimensional vectors, vector search algorithms can efficiently find similar or related items, even in large datasets without traditional structure. This allows for powerful search, recommendation, and insight generation across text, images, audio, and other unstructured formats. The ability to rapidly identify relevant content and connections in unstructured data opens up new possibilities for understanding complex information, making discoveries, and delivering more personalized experiences to users.
Overall, vector search unlocks the value in unstructured data by surfacing relevant information and relationships that would be difficult to find through traditional search or analysis methods. In addition, it is a crucial enabler for making generative AI systems more powerful, versatile, and customized to user needs. The ability to rapidly retrieve the most relevant information greatly enhances the capabilities of language models, question-answering systems, and other AI assistants.
As of 07/2024, Amazon Redshift, while providing the ability to store massive amounts of semi-structured & unstructured data (text/varbyte/json), did not have vector search capabilities. This article shows how you can leverage services like Amazon Bedrock to:
- Build and trigger batch embedding jobs to represent your unstructured data as vectors.
- Build K-Means Clusters to optimize your search.
- Execute Vector Search against your data; leveraging the K-Means cluster to speed up retrieval and reduce CPU consumption.
Once deployed, users can call a Redshift Stored Procedure to indicate the table they want to search, the string, and the number of results they want returned.
call sp_vector_search('reviews', 'bad broken unreliable slow', 100, 'searchresults'); select review_id, product_title, review_title, review_desc, similarity from #searchresults join reviews on review_id = recordid order by similarity desc
1. Batch Embedding
Amazon Bedrock has runtime quotas on model invocations. So, for large quantities of data, you should upload your data to S3 and call the CreateModelInvocationJob
API. Below we’ll use a sample data set of Amazon reviews and how you can prepare the data for batch embeddings and execute the batch embedding jobs.
Source Data
DROP TABLE reviews; CREATE TABLE reviews( marketplace varchar(10), customer_id varchar(15), review_id varchar(15), product_id varchar(25) DISTKEY, product_parent varchar(15), product_title varchar(1000), star_rating int, helpful_votes int, total_votes int, vine varchar(5), verified_purchase varchar(5), review_title varchar(5000), review_desc varchar(max), review_date date, year int) SORTKEY ( review_date ); COPY reviews FROM 's3://redshift-immersionday-labs/data/amazon-reviews/' IAM_ROLE default FORMAT AS PARQUET;
Unload Data for Batch Embedding
Upload the content you would like to embed to S3. In this example, we’re extracting a single field but you can optionally extract multiple fields from your data into a string.
To ensure the data is ready for batch embedding, the following parameters were applied:
replace
- In this example, there are escaped characters and un-escaped quotes. In order to export a serializable string, data cleansing is applied to a view and then used in the unload command.enable_case_sensitive_identifier
- is required as the batch embedding requires certain key names.extension
- is required as the batch embedding requiresjsonl
files.maxfilesize
- a batch embedding job can target multiple files referenced by an S3 prefix, but each file can’t exceed 50K records. For this dataset, <50K records will fit into each 15MB file.dimensions
- allows you to specify the number of dimensions to use for your embedding. Options include 256, 384 &1024.
SET enable_case_sensitive_identifier TO true; drop view embedding_view; create view embedding_view ("recordId", "modelInput") as select review_id, JSON_PARSE('{"dimensions": 256, "inputText": "'+replace(replace(review_desc, '\\', '\'), '"', '\\"')+'"}') from reviews; UNLOAD ('select * from embedding_view') TO 's3://bedrock-batch-invocations/input/reviews' IAM_ROLE DEFAULT FORMAT JSON PARALLEL OFF MAXFILESIZE 15 MB EXTENSION 'jsonl' CLEANPATH
Batch Embedding Script
To execute a Bedrock batch invocation job, leverage the create_model_invocation_job
function; referencing the modelId (amazon.titan-embed-text-v2:0
), as well as an IAM role which the Amazon Bedrock service can assume to execute the job. Ensure the IAM Role has the IAM Trust Policy & IAM Permissions described below.
The below script loops through the files generated by the Amazon Redshift and triggers an invocation for each; waiting until it’s complete to start the next invocation. Replace the bucket
, prefix
, account
and role
with values for your AWS Account.
import boto3, json, sys, time bedrock_runtime = boto3.client(service_name="bedrock-runtime", region_name="us-west-2") bedrock = boto3.client("bedrock", region_name="us-west-2") s3 = boto3.client('s3', region_name="us-west-2") account = '012345678901' role = 'bedrock-batch-invocations' inbucket = 'bedrock-batch-invocations' inprefix = 'input' outbucket = 'bedrock-batch-invocations' outprefix = 'output' jobnum = str(int(time.time())) def wait(response): r1 = bedrock.get_model_invocation_job(jobIdentifier=response["jobArn"]) if r1['status'] != 'Completed' and r1['status'] != 'Failed': print('Not done, waiting: ' + response["jobArn"] + ' Status: ' + r1['status']) return 1 else: return 0 #Delete Existing objects = s3.list_objects_v2(Bucket=outbucket, Prefix=outprefix) for obj in objects.get('Contents', []): key = obj['Key'] s3.delete_object(Bucket=outbucket, Key=key) #invoke per input file output_config={"s3OutputDataConfig":{"s3Uri": "s3://"+outbucket+'/'+outprefix+"/"}} objects = s3.list_objects_v2(Bucket=inbucket, Prefix=inprefix) outfiles = [] for i,obj in enumerate(objects.get('Contents', [])): key = obj['Key'] print("Processing: " + key) input_config={"s3InputDataConfig":{"s3Uri": "s3://"+inbucket+"/"+key,"s3InputFormat": "JSONL"}} response = bedrock.create_model_invocation_job(modelId="amazon.titan-embed-text-v2:0",roleArn="arn:aws:iam::"+account+":role/"+role,inputDataConfig=input_config,outputDataConfig=output_config, jobName="reviewsEmbeddingJob"+jobnum+str(i)) jobArn = response["jobArn"] jobid = jobArn[jobArn.rfind('/')+1:] filename = key[key.rfind('/')+1:] outfiles.append({"url":"s3://"+outbucket+'/'+outprefix+"/"+jobid+"/"+filename+".out","mandatory":False}) while wait(response): time.sleep(30) # generate manifest file for loading manifest = {"entries":outfiles} s3.put_object( Body=json.dumps(manifest), Bucket=outbucket, Key=outprefix+'/output.manifest')
IAM Trust Policy
Add this trust policy to the role passed above, replacing the account number with your AWS Account.
{"Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": {"Service": "bedrock.amazonaws.com"}, "Action": "sts:AssumeRole", "Condition": { "StringEquals": {"aws:SourceAccount": "012345678901"}, "ArnEquals": {"aws:SourceArn": "arn:aws:bedrock:us-west-2:012345678901:model-invocation-job/*"} } } ]}
IAM Permissions
Add this permission block to the role passed above, replacing the S3 bucket/key with values for your AWS Account.
{"Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject","s3:PutObject","s3:ListBucket"], "Resource": [ "arn:aws:s3:::bedrock-batch-invocations", "arn:aws:s3:::bedrock-batch-invocations/input", "arn:aws:s3:::bedrock-batch-invocations/output" ] } ] }
Upload Embeddings
When the data is available in s3, load it using the COPY statement into a table with the extension _embeddings This table name will be assumed later.
drop table reviews_embeddings; create table reviews_embeddings (recordid varchar(15), "modelOutput" SUPER); copy reviews_embeddings from 's3://bedrock-batch-invocations/output/output.manifest' iam_role DEFAULT MANIFEST format as json 'auto ignorecase';
Query Embeddings
This is a simple example of how you can unnest the embedding into rows by index. This technique will be used when calculating the euclidean distance between two vectors as well as getting the centroid of a set of vectors.
SET enable_case_sensitive_identifier TO true; select recordid, mo, moi from reviews_embeddings re, re."modelOutput".embedding mo at moi where recordid = 'R1KUVTPJ1NU8MQ'
Similarity Between Vectors
This is an example query which demonstrates how to unnest the embedding into rows and then perform the cross-product when re-aggregating the data.
select sum(av::float*bv::float)/SQRT(sum(av::float*av::float)*sum(bv::float*bv::float)) similarity from reviews_embeddings a, a."modelOutput".embedding as av at avi, reviews_embeddings b, b."modelOutput".embedding as bv at bvi where avi = bvi and a.recordid = 'R1KUVTPJ1NU8MQ' and b.recordid = 'R1G5JW56N4R8JZ'
Similarity Between Vectors
This is an example query which demonstrates how to unnest the embedding into rows and then perform the cross-product when re-aggregating the data.
select sum(av::float*bv::float)/SQRT(sum(av::float*av::float)*sum(bv::float*bv::float)) similarity from reviews_embeddings a, a."modelOutput".embedding as av at avi, reviews_embeddings b, b."modelOutput".embedding as bv at bvi where avi = bvi and a.recordid = 'R1KUVTPJ1NU8MQ' and b.recordid = 'R1G5JW56N4R8JZ'
2. Build K-Means Clusters
K-means clustering provides several important advantages for large-scale vector search. It enables efficient indexing and retrieval by dividing the high-dimensional vector space into distinct clusters, allowing search to be narrowed down to the most relevant areas first. The cluster centroids also serve as a compressed representation, reducing dimensionality and helping combat the challenges of high-dimensional data. Additionally, k-means is a highly scalable algorithm that can handle growing datasets, and the clustering supports approximate nearest neighbor search for fast, if slightly less accurate, results.
Creating Clusters
This stored procedure will expect a <tablename>_embeddings
table to exist and will create a <tablename>_kmeans
table as well as a <tablename>_kmeans_clusters
table which will associate the embeddings with a cluster. The purpose is that rather than doing a similarity search against every embedding (in our case ~1M records), you can first do a search against the centroid of your clusters, then do a search against the members of the best cluster. In the example below, I’m creating 100 clusters, each with 1000 members. So, instead of searching 1M records, I will search 1100 records. This stored procedure is also available in the Amazon Redshift UDFs repository.
Note: Cosine Similarity can be achieved using a python UDF and the numpy library making the below SQL easier to read. However, the following method of un-nesting the SUPER & re-aggregating was more performant.
CREATE OR REPLACE PROCEDURE sp_kmeans (tablename IN varchar, clusters IN int) AS $$ DECLARE cluster_size int; cluster int := 1; similarity float; i int; BEGIN --will error if table doesn't exist EXECUTE 'select * from '||tablename||'_embeddings limit 1'; EXECUTE 'SELECT CEIL(count(1)/'||clusters ||') from ' || tablename||'_embeddings' INTO cluster_size; -- create kmeans tables and choose random starting centroids EXECUTE 'CREATE TABLE IF NOT EXISTS ' || tablename || '_kmeans ( cluster int, centroid SUPER, startts timestamp, endts timestamp, interations int) DISTSTYLE ALL'; EXECUTE 'TRUNCATE TABLE ' || tablename || '_kmeans'; EXECUTE 'CREATE TABLE IF NOT EXISTS ' || tablename || '_kmeans_clusters ( cluster int, recordid varchar(15), similarity float, rnk int) DISTKEY(recordid)'; EXECUTE 'TRUNCATE TABLE ' || tablename || '_kmeans_clusters'; WHILE cluster <= clusters LOOP --choose a random starting centroid from the remaining embeddings EXECUTE 'INSERT INTO ' || tablename || '_kmeans SELECT '||cluster||', "modelOutput".embedding, CURRENT_TIMESTAMP, NULL, NULL FROM ' || tablename || '_embeddings WHERE "modelOutput".embedding is not null AND "recordId" not in ( select recordid from ' || tablename || '_kmeans_clusters) LIMIT 1'; COMMIT; i := 1; similarity := 0; WHILE similarity < .999 LOOP --get embeddings closest to centroid EXECUTE 'DELETE FROM ' || tablename || '_kmeans_clusters where cluster = '||cluster; EXECUTE 'INSERT INTO ' || tablename || '_kmeans_clusters select * from (select *, rank() over (partition by k.cluster order by k.similarity desc) rnk from ( select cluster, e."recordId", sum(kv::float*ev::float)/SQRT(sum(kv::float*kv::float)*sum(ev::float*ev::float)) similarity from ' || tablename || '_kmeans k, k.centroid kv at kvi, ' || tablename || '_embeddings e, e."modelOutput".embedding ev at evi where kvi = evi and k.cluster = '||cluster||' AND e."recordId" not in ( select recordid from ' || tablename || '_kmeans_clusters) group by 1,2) k ) r where r.rnk <= ' || cluster_size; COMMIT; -- determine new center EXECUTE 'DROP TABLE IF EXISTS #centroid'; EXECUTE 'CREATE TABLE #centroid as SELECT JSON_PARSE(''['' || listagg(po::varchar, '','') within group (order by poi) || '']'') centroid FROM ( select poi, avg(po::float) po from ' || tablename || '_kmeans_clusters as nn, ' || tablename || '_embeddings re, re."modelOutput".embedding as po at poi where nn.recordid = re."recordId" and nn.cluster = ' || cluster || ' group by poi) as c'; COMMIT; -- determine distance from new center to old center EXECUTE 'SELECT sum(kv::float*mv::float)/SQRT(sum(kv::float*kv::float)*sum(mv::float*mv::float)) from #centroid k, k.centroid kv at kvi, ' || tablename || '_kmeans m, m.centroid mv at mvi where m.cluster = '|| cluster ||' and kvi = mvi' INTO similarity; COMMIT; EXECUTE 'UPDATE ' || tablename || '_kmeans SET centroid = (select centroid from #centroid), endts = CURRENT_TIMESTAMP, interations = '|| i ||' where cluster = ' || cluster; COMMIT; i := i+1; COMMIT; END LOOP; cluster := cluster+1; END LOOP; END $$ LANGUAGE plpgsql; --USAGE call sp_kmeans('reviews', 10)
3. Execute Vector Search
Lambda UDF for On-the-Fly embeddings
Create a Lambda UDF function to support runtime embedding for your runtime search query. Ensure the IAM Role specified and associated, is required to have the lambda:InvokeFunction
permission on the lambda function you create. This LambdaUDF is also available in the Amazon Redshift UDFs repository.
Note: Amazon Bedrock has runtime quotas on model invocations (2,000 / min) so you should not use this for batch embeddings.
CREATE OR REPLACE EXTERNAL FUNCTION f_titan_embedding (varchar) RETURNS varchar(max) STABLE LAMBDA 'f-titan-embedding-varchar' IAM_ROLE DEFAULT;
Create a python lambda function, using the following code:
import boto3, json, sys bedrock_runtime = boto3.client(service_name="bedrock-runtime") def generate_embeddings(text=None): try: input_data = {"inputText": text} response = bedrock_runtime.invoke_model( body=json.dumps(input_data), modelId="amazon.titan-embed-text-v2:0", accept="application/json", contentType="application/json" ) response = response.get("body").read().decode('utf-8') response_json = json.loads(response) return response_json.get("embedding") except Exception as e: exc_type, exc_obj, exc_tb = sys.exc_info() print(exc_type, exc_tb.tb_lineno) print('Error: '+ str(e)) return '' def handler(event, context): print(event) redshift_response = {"success": False, "num_records": event["num_records"]} try: result = [] for row in event["arguments"]: try: embedding = generate_embeddings(row[0]) result.append(json.dumps(embedding)) except Exception as e: print(f"Error: {e}") result.append(None) redshift_response["success"] = True redshift_response["results"] = result except Exception as e: redshift_response["error_msg"] = str(e) print('Error: '+ str(e)) exc_type, exc_obj, exc_tb = sys.exc_info() print(exc_type, exc_tb.tb_lineno) return json.dumps(redshift_response)
The following policy should be added to the Lambda Execution role to use Amazon Bedrock.
{ "Action": ["bedrock:InvokeModel"], "Resource": ["arn:aws:bedrock:us-west-2::foundation-model/amazon.titan-embed-text-v2:0"], "Effect": "Allow" }
Runtime Search
Create the following stored procedure to support vector search. This stored procedure is also available in the Amazon Redshift UDFs repository. The query performs the following steps:
- Use the lambdaUDF to embed the search string
- Compare that embedding to the K-Means centroids to find the best cluster
- Compare the embedding to all members of the cluster to calculate the similarity to each member
- Load the N best matches from the cluster into a temporary table
CREATE OR REPLACE PROCEDURE sp_vector_search (tablename IN varchar, search IN varchar, cnt IN int, tmp_name IN varchar) AS $$ BEGIN EXECUTE 'drop table if exists #'||tmp_name; EXECUTE 'create table #'||tmp_name ||' (recordid varchar(100), similarity float)'; EXECUTE 'insert into #'||tmp_name ||' select re."recordId", sum(rv::float*qv::float)/SQRT(sum(rv::float*rv::float)*sum(qv::float*qv::float)) esimilarity from ( select k.cluster, q.q, sum(kv::float*qv::float)/SQRT(sum(kv::float*kv::float)*sum(qv::float*qv::float)) csimilarity from '||tablename||'_kmeans k, k.centroid kv at kvi, (select JSON_PARSE(f_titan_embedding('''+search+''')) as q) q, q.q qv at qvi where kvi = qvi group by 1,2 qualify rank() over (order by csimilarity desc) = 1 ) q, '||tablename||'_kmeans_clusters c, '||tablename||'_embeddings re, q.q qv at qvi, re."modelOutput".embedding rv at rvi where rvi = qvi and c.cluster = q.cluster and c.recordid = re."recordId" group by 1 qualify rank() over (order by esimilarity desc) <= '||cnt; END $$ LANGUAGE plpgsql;
Call the stored procedure passing in the table you want to search, the search string, the number of results you want returned and the temp table to store the results. Once complete, you can use the results to join back to the original data set to return the contextual information.
call sp_vector_search('reviews', 'bad broken unreliable slow', 100, 'searchresults'); select review_id, product_title, review_title, review_desc, similarity from #searchresults join reviews on review_id = recordid order by similarity desc
Redshift is designed for structured data, while vector search primarily deals with unstructured data like images, text, and audio. This inherent mismatch can lead to performance and efficiency issues. visit: https://thecapsapk.com/
While Amazon Redshift is primarily known for its data warehousing capabilities, its recent advancements in handling semi-structured and unstructured data have opened doors for innovative applications. Vector search, a technique used to find similar items in a high-dimensional space, is one such application that could benefit from Redshift's integration. visit: https://wendys-menu-with-prices.com/
Relevant content
- asked 2 years agolg...
- asked a year agolg...
- AWS OFFICIALUpdated 2 years ago
- AWS OFFICIALUpdated 2 years ago
- AWS OFFICIALUpdated 2 years ago