Stepfunction과 Opensearch를 이용한 동영상 검색 웹애플리케이션 구축 방법
6분 분량
콘텐츠 수준: 중급
0
현재 기사에서는 S3에 동영상을 업로드 하면 자동적으로 음성을 인식하고 Bedrock을 통해 분석한뒤 Opensearch에서 검색할수있는 웹 어플리케이션을 구축하는 방법에 대해 안내합니다.
아키텍처
사용자가 업로드한 동영상에 대해 음성인식을 통해 텍스트를 추출합니다. 이후에 Bedrock을 이용하여 메타데이터(요약, 키워드 등)를 추출한 후, Step Functions를 이용해 데이터를 처리하고 OpenSearch에 인덱싱하여, 사용자가 입력하는 키워드나 필터를 기반으로 빠르게 동영상을 검색할 수 있는 워크플로를 구축합니다.
사전 준비 사항
- S3 버킷 ICN리전에 존재해야 합니다.
- Opensearch 클러스터가 ICN 리전에 존재해야합니다.
- Bedrock의 anthropic.claude-3-haiku 모델에 대한 권한 설정이 되어있어야 합니다.
- Local 환경에서의 AWS 자격증명이 되어있어야합니다. (해당 예시에서는 AdministratorAccess권한으로 설정합니다. 최소권한 원칙을 참고하여 설정하시는것을 권장드립니다.)
Step1. Lambda 에 사용할 Layer 생성
- 명령 프롬프트를 열고 my-openseach-function 프로젝트 디렉터리를 만듭니다.
mkdir my-opensearch-function
- my-sourcecode-function 프로젝트 디렉터리로 이동합니다.
cd my-opensearch-function
- 외부 라이브러리를 새package 디렉터리에 설치합니다.
pip3 install --target ./package boto3
pip3 install --target ./package requests
pip3 install --target ./package requests_aws4auth
- 루트에서 설치된 라이브러리를 포함하는 배포 패키지를 만듭니다. 다음 명령은 프로젝트 디렉터리에my-deployment-package.zip 파일을 생성합니다.
cd package
zip -r ../my-deployment-package.zip .
- 위의 zip 파일을 이용하여 Lambda 의 Layer을 생성합니다.
Step2. S3 트리거 및 Transcribe를 이용하기 위한 Lambda 함수 생성
- Lambda 함수를 생성합니다.
- 설정: 아래와 같이 설정한뒤에 생성합니다.
- Function name : TranscribeLambda
- Runtime : Python 3.11
- Use an existing role : AmazonTranscribeFullAccess, AmazonS3FullAccess 권한을 가진 Role과 연결합니다.
- Lambda를 생성한 리전에 있는 S3 버킷에 트리거를 설정한뒤 연결합니다.
- [Configuration], [General configuration]에서 timeout 값을 5분으로 지정합니다.
- pandas라이브러리를 사용하기에 [Code], 하단의 [Layers]에서 [Add a layer], [Specify an ARN]을 선택합니다.
- arn:aws:lambda:ap-northeast-2:770693421928:layer:Klayers-p312-pandas:8 해당 레이어를 추가합니다.
- 아래와 같이 코드를 입력한 뒤에 저장하고 [Deploy]합니다. 이때 <Stepfunction ARN>은 이후에 수정합니다.
import json
import time
import boto3
import pandas as pd
import datetime
from datetime import datetime
transcribe_client = boto3.client('transcribe')
stepfunctions_client = boto3.client('stepfunctions')
def lambda_handler(event, context):
records = json.loads(json.dumps(event))["Records"]
bucket_name = records[0]["s3"]["bucket"]["name"]
object_key = records[0]["s3"]["object"]["key"]
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
job_name = 'transcribe-job{}'.format(timestamp)
language_code = 'en-US'
media_format = 'wav'
response = transcribe_client.start_transcription_job(
TranscriptionJobName=job_name,
LanguageCode=language_code,
MediaFormat=media_format,
Media={
'MediaFileUri': f's3://{bucket_name}/{object_key}'
}
)
while True:
response = transcribe_client.get_transcription_job(TranscriptionJobName=job_name)
job_status = response['TranscriptionJob']['TranscriptionJobStatus']
if job_status in ['COMPLETED', 'FAILED']:
break
time.sleep(10)
if job_status == 'COMPLETED':
data = pd.read_json(response['TranscriptionJob']['Transcript']['TranscriptFileUri'])
raw_data=data["results"]["transcripts"][0]["transcript"]
json_str = json.dumps({"data":raw_data,"title":object_key})
print(json_str)
stateMachineArn=<Stepfunction ARN>
response = stepfunctions_client.start_execution(stateMachineArn=stateMachineArn,name=job_name,input=json_str,traceHeader='string')
else:
print("Transcription Job is not completed yet.")
Step3. Bedrock API를 사용하는 Lambda 함수 생성
- Lambda 함수를 아래의 설정을 이용하여 생성합니다.
- 설정: 아래와 같이 설정한뒤에 생성합니다.
- Function name : SF_BedrockLambda
- Runtime : Python 3.11
- Use an existing role : AmazonBedrockFullAccess, AmazonS3FullAccess 권한을 가진 Role과 연결합니다.
- 아래와 같이 코드를 입력한 뒤에 저장하고 [Deploy]합니다
import json
import boto3
def lambda_handler(event, context):
client = boto3.client("bedrock-runtime", region_name="us-east-1")
model_id = "anthropic.claude-3-haiku-20240307-v1:0"
message =event['data']
title=event['title']
prompt = "please summarize this text {} and extract the keyword. please express the JSON Format <JSON> </JSON>.".format(message)
native_request = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 512,
"temperature": 0.5,
"messages": [
{
"role": "user",
"content": [{"type": "text", "text": prompt}],
}
],
}
# Convert the native request to JSON.
request = json.dumps(native_request)
try:
# Invoke the model with the request.
response = client.invoke_model(modelId=model_id, body=request)
model_response = json.loads(response["body"].read())
response_text = model_response["content"][0]["text"]
input_str=response_text
start_index = input_str.find('<JSON>') + 6
end_index = input_str.find('</JSON>')
json_str = input_str[start_index:end_index]
json_data = json.loads(json_str)
json_data['title']=title
return json_data
except Exception as e:
print(e)
exit(1)
Step4. Opensearch API를 사용하여 데이터 보내는 Lambda 함수 생성
- Lambda 함수를 아래의 설정을 이용하여 생성합니다.
- 설정: 아래와 같이 설정한뒤에 생성합니다.
- Function name : SF_OpensearchLambda
- Runtime : Python 3.11
- Use an existing role : AmazonOpenSearchServiceFullAccess 권한을 가진 Role과 연결합니다.
- requests 모듈을 사용하기에 첫번째 단계에서 생성한 레이어를 추가합니다.
- 아래의 코드에 host 명 및 index를 설정한 뒤에 저장하고 [Deploy]합니다.
import boto3
import json
import requests
from requests_aws4auth import AWS4Auth
def lambda_handler(event, context):
host = <domain endpoint>
region = 'ap-northeast-2'
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
index = <index>
datatype = '_doc'
url = host + index + '/' + datatype
headers = {"Content-Type": "application/json"}
document = {"title":event["title"],"summary": event["summary"], "keywords": event['keywords']}
r = requests.post(url, auth=awsauth, json=document, headers=headers) # Change method to POST
return event
Step5. StepFunctions 상태 머신 생성
- StepFunction 을 클릭한뒤 이전 단계에서 생성한 Lambda 함수를 아래와 같이 연결합니다.
- 생성한 StepFunction 의 IAM에 AdministratorAccess를 추가적으로 연결 합니다.
Step6. Streamlit 코드 생성
- Python IDE에서 아래와 같이 명령어를 입력하여 streamlit 라이브러리 및 boto3를 설치합니다.
pip install streamlit
pip install boto3
- StreamlitWeb.py파일을 생성한뒤에 아래의 코드를 입력하고 bucket name 및 opensearch domain을 변경합니다.
import streamlit as st
import boto3
from botocore.exceptions import NoCredentialsError
from requests_aws4auth import AWS4Auth
import requests
s3 = boto3.client('s3')
S3_BUCKET= '<bucket name>'
def upload_video_to_s3(file, bucket_name, file_name):
try:
s3.upload_fileobj(file, bucket_name, file_name)
return True
except NoCredentialsError:
st.error("AWS 자격 증명이 올바르지 않습니다.")
return False
def upload_page():
st.title("S3 동영상 업로드")
# 파일 업로드 기능
uploaded_file = st.file_uploader("동영상을 선택하세요", type=["mp4", "mov", "avi", "mkv"])
if uploaded_file is not None:
file_name = uploaded_file.name
if upload_video_to_s3(uploaded_file, S3_BUCKET, file_name):
st.success(f"{file_name}이(가) S3에 성공적으로 업로드되었습니다.")
else:
st.error("동영상 업로드에 실패했습니다.")
def search_opensearch(query):
host = '<opensearch domain>/_search'
region = 'ap-northeast-2' # e.g. us-west-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
# 검색 쿼리 정의 (summary와 tags에 대해 검색)
search_query = {
"query": {
"bool": {
"should": [
{"match": {"summary": query}},
{"match": {"keywords": query}}
]
}
}
}
headers = {"Content-Type": "application/json"}
response = requests.post(host, auth=awsauth, json=search_query, headers=headers)
if response.status_code == 200:
results = response.json()
return results['hits']['hits']
else:
st.error(f"Error: {response.status_code}")
return []
def search_page():
st.title("S3 동영상 검색")
# S3에 있는 동영상 파일 목록 가져오기
query = st.text_input("검색어를 입력하세요")
if st.button("검색"):
if query:
# 검색 수행
results = search_opensearch(query)
# 검색 결과 표시
if results:
for result in results:
source = result['_source']
st.subheader(source.get('title', 'No title'))
st.write(f"Summary: {source.get('summary', 'No summary')}")
st.write(f"Keywords: {source.get('keywords', 'No keywords')}")
st.write("---")
else:
st.write("검색 결과가 없습니다.")
else:
st.write("검색어를 입력하세요.")
def main():
st.sidebar.title("Navigation")
page = st.sidebar.selectbox("Page", ["Upload", "Search"])
if page == "Upload":
upload_page()
elif page == "Search":
search_page()
if __name__ == "__main__":
main()
Step7. 테스트
- 아래의 명령어를 통해 py 파일을 실행합니다.
- streamlit run .\StreamlitWeb.py
- 동영상 업로드 페이지
- 동영상 검색 페이지
참고 자료
[+] Lambda를 사용하는 Step Functions 상태 시스템 만들기
언어
한국어
댓글 없음
관련 콘텐츠
- 질문됨 일 년 전lg...
- 질문됨 5달 전lg...
- 질문됨 일 년 전lg...
- AWS 공식업데이트됨 2년 전
- AWS 공식업데이트됨 2년 전