해당 기사에서는 Kinesis 데이터 스트림에 데이터가 적재 되면 즉시 이를 트리거 삼아 Lambda 함수에서 받는 방법을 알려드립니다.
사용 사례
Amazon Kinesis Data Streams를 사용하여 대규모 데이터를 수집하고 처리할 수 있습니다. Amazon Kinesis Data Streams를 이용하여 실시간 측정치 및 보고, 실시간 데이터 분석 및 복잡한 스트림처리등이 가능합니다. 본 기사에서는 Kinesis Data Streams에 데이터가 적재되는 경우 이를 Lambda 에서 감지하여 확인하는 방법을 안내합니다. 이를 통해서 Lambda에서 DynamoDB,S3 등의 스토리지에 저장할 수 있습니다.
아래의 예시는 간단하게 timestamp,user_id,click_id를 데이터 스트림에 적재하고 적재된 데이터는 실시간으로 Lambda에서 트리거하여 데이터를 CloudWatchLogs에서 보여주는 예시입니다.
전제 조건
- Pycharm과 같은 Python IDE가 설치 되어 있어야합니다.
- aws cli가 설치되어야 있어야 하며 자격증명시에는 아래의 권한이 IAM 사용자에 포함되어 있어야합니다.[1][2]
1단계 : KinesisDataStream 생성
- [Amazon Kinesis],[데이터 스트림]에서 [데이터 스트림 생성]을 클릭합니다.
- 데이터 스트림 이름에는 “DemoStream” 입력하고 프로비저닝을 클릭하여 샤드를 1개로 지정하여 데이터 스트림을 생성합니다.
Step2. Lambda 실행시 필요한 IAM 역할 생성
- IAM페이지에 접속하여 [역할],[역할 생성], [Lambda] 아래의 두 권한을 넣어줍니다.
- CloudWatchFullAccess, AmazonKinesisReadOnlyAccess
- 역할 세부정보에서 역할의 이름 “KinesisLambdaRole” 으로 지정하여 역할을 생성합니다.
Step3. Lambda 실행시 필요한 IAM 역할 생성
- Lambda 페이지에 접속하여 [함수],[함수 생성]을 클릭합니다.
- 함수이름에는 DemoLambda을 입력하고 런타임의 경우 Python최신버전(3.11)을 선택합니다.
- 기본 실행역할 변경에서 [기존 역할 사용],[KinesisLambdaRole]를 선택한뒤 함수를 생성합니다.
Step4. Lambda 함수 설정하기
- [트리거 추가],[Kinesis] 이전에 생성한 [DemoStream]을 클릭한뒤 저장합니다.
- Lambda 함수의 코드를 아래와 같이 수정합니다.
import json
import boto3
import base64
def lambda_handler(event, context):
encoded_data=event['Records'][0]['kinesis']['data']
decoded_data = base64.b64decode(encoded_data).decode('utf-8')
json_data=json.loads(decoded_data)
print(json_data)
Step5. KinesisDataStream에 데이터를 넣기
- Python IDE에서 boto3를 pip를 이용하여 설치합니다.
- 이후 아래의 코드를 입력하여 KinesisDataStream에 데이터를 넣습니다.
import json
import random
import boto3
from datetime import datetime
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
stream_name = 'DemoStream'
kinesis_client = boto3.client('kinesis')
def create_data():
data = {
"timestamp":timestamp,
"used_id":"1",
"click_id": random.randrange(1,10)
}
return data
def put_data_into_kinesis(data):
data = json.dumps(data)
partition_key = "DemoPartitionKey"
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
sample_data = create_data()
put_data_into_kinesis(sample_data)
Step6. CloudWatch Logs를 통해서 확인
- Lambda함수에서 [모니터링]탭을 클릭한뒤 [CloudWatchLogs 보기]를 클릭합니다.
- 위의 단계를 실행시켜 데이터를 적재한뒤 확인하면 아래처럼 DemoStream에 실시간으로 저장된 데이터를 확인할수있습니다.
참고 자료
[1] AWS CLI 설치 및 업데이트 지침
https://docs.aws.amazon.com/ko_kr/cli/latest/userguide/getting-started-install.html#getting-started-install-instructions
[2] 구성 설정 지정 및 보기
https://docs.aws.amazon.com/ko_kr/cli/latest/userguide/cli-configure-files.html#cli-configure-files-methods