실시간 비디오를 Rekognition을 이용하여 물체를 인식하고 해당 정보를 Kinesis Data Stream에 저장하여 특정 사물 발견시 Lambda에서 감지하여 SES로 경고 메시지를 보내는 방법
4 minute read
Content level: Advanced
0
물체 감지 및 경고 시스템은 보안, 안전, 자동화 및 감시 기능을 강화하여 다양한 분야에서 유용하게 활용될 수 있습니다. 해당 가이드에서는 AWS 다양한 서비스들을 이용하여 실시간으로 비디오 데이터를 처리하는 방법에 대해서 가이드 해드립니다.
아키텍처
파이썬의 오픈라이브러리인 Opencv를 이용하여 웹캠을 실행합니다. 실행되어진 웹캡에서 일정한 간격으로 이미지를 생성하며 생성되어진 이미지를 Amazon Rekognition을 이용하여 라벨링 분류합니다. 분류된 라벨을 Kinesis Data Stream에 저장합니다. 저장 되어진 정보들은 Lambda의 트리거를 이용하여 감지됩니다. 이때 허용되지 않는 데이터(사물)가 있을경우 SES 를 이용하여 경고 메시지를 보냅니다.
사용사례
해당 가이드는 아래의 사용사례에 사용하실 수 있습니다.
- 안전: 위험한 상황에서 물체를 감지하고 경고를 보내는 것은 안전을 유지하는 데 도움이 됩니다. 예를 들어, 화재가 발생하거나 위험한 물체가 감지되면 즉시 경고를 받아 조치를 취할 수 있습니다.
- 자동화: 물체 감지 및 경고 시스템을 사용하면 일상적인 작업을 자동화할 수 있습니다. 예를 들어, 특정 물체가 감지되면 일련의 작업이 자동으로 실행되도록 설정할 수 있습니다.
사전 준비사항
- Kinesis Data Stream에 필요 합니다.
- AWS CLI에서 자격증명 필요 (해당 가이드에서는 Administartor권한 사용)
- 발신자와 송신자의 SES 자격증명 인증 필요 [1] SES를 이용하기 위해서는 발신자 및 송신자의 자격증명이 필요합니다.
- Lambda에서 사용할 IAM 역할 생성 필요 (AmazonKinesisReadOnlyAccess, AmazonSESFullAccess)
단계1. Webcam 실행 및 데이터 처리 코드
- cv2,boto3 라이브러리를 사용하기 위해 PythonIDE창에서 아래의 라이브러리를 설치합니다.
pip install opencv-python
pip install boto3
- 프로젝트를 생성하고 SendVedioToStream.py 라는 파이썬 파일을 생성합니다.
- 아래와 같이 저장할 폴더를 생성 및 지정합니다.해당 가이드에서는 captured_images 라는 폴더를 이용합니다.
- 생성한 SendVedioToStream.py 파일에 코드를 아래의 코드를 입력합니다. (아래의 코드는 Webcam을 실행시키고 3초마다 webcam에 이미지를 생성하고 이미지를 rekognition으로 감지하여 인식된 물체들에 대한 정보를 kinesis data stream에 보내는 코드입니다.)
import cv2
import time
import boto3
import json
from datetime import datetime
current_time = datetime.now()
timestamp = current_time.strftime("%Y.%m.%d %H:%M")
stream_name = '<kinesis stream 이름>'
kinesis_client = boto3.client('kinesis')
rekognition_client = boto3.client('rekognition')
def put_data_into_kinesis(data):
data = json.dumps(data)
partition_key = "DemoPartitionKey"
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
output_directory = 'captured_images/'
image_count = 0
while True:
ret, frame = cap.read()
if not ret:
break
if time.time() % 3 < 0.1:
image_count += 1
image_name = f'{output_directory}image_{image_count}.jpg'
cv2.imwrite(image_name, frame)
with open(image_name, 'rb') as image:
response = rekognition_client.detect_labels(Image={'Bytes': image.read()})
lst = []
for label in response['Labels']:
if int(label["Confidence"]) > 90:
lst.append(label["Name"])
keywords = " ".join(lst)
data={"image_name":image_name,"keywords":keywords,"timestamp":timestamp}
put_data_into_kinesis(data)
cap.release()
cv2.destroyAllWindows()
2단계 : Lambda 함수 생성
- [함수 이름] 원하는 함수이름(Kinesis_Lambda) 을 입력합니다.
- [런타임],[Python 3.11]로 최신버전의 파이썬으로 지정합니다.
- 이전 단계에서 사전 준비사항에서 생성한 IAM 권한을 연결합니다.
- 나머지 사항은 그대로 두고 [함수 생성]을 클릭하여 함수를 생성합니다.
3단계: Lambda 트리거 설정
- 2단계에서 생성한 Lambda 함수를 클릭한후에 [트리거 추가]를 클릭합니다.
- [트리거 구성],[소스 선택], 사용할 kinesis stream을 선택합니다.
- 해당 가이드에서는 Batch size, Batch window를 각 각 1로 설정합니다.
- 트리거 설정이 완료되면 아래와 같이 트리거 설정이 완료됩니다.
4단계. Lambda 코드 수정
- 생성한 Lambda 함수를 클릭하고 [코드]를 클릭하여 아래의 코드를 삽입합니다.
- 아래의 수신자 이메일과, 발신자 이메일을 SES에 자격증명된 이메일을 넣습니다.
- 아래의 코드는 KinesisDataStream의 트리거를 통해 데이터를 받아들이고 허용되지 않은 물품이 발견되면 SES를 이용하여 경고 메시지를 보냅니다. Prohibited_items을 수정하여 허용되지 않는 물품을 수정할 수 있습니다.
import json
import boto3
import base64
client_ses = boto3.client('ses')
def send_warning_message(data):
response = client_ses.send_email(
Destination={
'ToAddresses': ['<수신자 이메일1>','<수신자 이메일2>','<수신자 이메일3>']
},
Message={
'Body': {
'Text': {
'Charset': 'UTF-8',
'Data': data,
}
},
'Subject': {
'Charset': 'UTF-8',
'Data': '[경고 알림] 금지된 물건이 확인되었습니다.',
},
},
Source='발신자 이메일'
)
def lambda_handler(event, context):
encoded_data=event['Records'][0]['kinesis']['data']
decoded_data = base64.b64decode(encoded_data).decode('utf-8')
json_data=json.loads(decoded_data)
keywords=json_data['keywords']
timestamp=json_data['timestamp']
Prohibited_items=["Weapon","Knife","Gun","Pen","Computer"]
warning=False
for item in Prohibited_items:
if item in keywords:
warning=True
warning_item=item
message="!경고: "+timestamp+"에 "+warning_item+"이 확인되었습니다."
send_warning_message(message)
break
if warning:
print("they have a prohibited item")
5단계: 경고 메시지 테스트
- 금지된 물품을 인식하면 SES 에서 알림 메시지를 발송할 수 있습니다.
- 아래는 컴퓨터를 Webcam에 비추고 그에 따라 성공적이게 경고 메시지를 받을 수 있음을 확인하였습니다.
참고
[1] Amazon SES에서 자격 증명 생성 및 확인: 이메일 주소 자격 증명 생성
[+] Boto3 : Kinesis / Client / put_record
[+] Boto3 : Rekognition / Client / detect_labels
[+] Boto3 : SES / Client / send_email
[+] Lambda의 트리거를 이용하여 Kinesis Data stream의 데이터를 받는 방법
[+] EventBridge에서 EC2 의 상태변화를 감지하여 상태변경 메일을 보내는 방법
Language
한국어
No comments
Relevant content
- asked a year agolg...
- asked 4 months agolg...
- AWS OFFICIALUpdated 5 months ago
- AWS OFFICIALUpdated 2 years ago
- AWS OFFICIALUpdated 3 months ago
- AWS OFFICIALUpdated 6 months ago