Usando AWS re:Post, accetti AWS re:Post Termini di utilizzo

Lambda의 트리거를 이용하여 Kinesis Data stream의 데이터를 받는 방법

2 minuti di lettura
Livello di contenuto: Intermedio
0

해당 기사에서는 Kinesis 데이터 스트림에 데이터가 적재 되면 즉시 이를 트리거 삼아 Lambda 함수에서 받는 방법을 알려드립니다.

사용 사례

Amazon Kinesis Data Streams를 사용하여 대규모 데이터를 수집하고 처리할 수 있습니다. Amazon Kinesis Data Streams를 이용하여 실시간 측정치 및 보고, 실시간 데이터 분석 및 복잡한 스트림처리등이 가능합니다. 본 기사에서는 Kinesis Data Streams에 데이터가 적재되는 경우 이를 Lambda 에서 감지하여 확인하는 방법을 안내합니다. 이를 통해서 Lambda에서 DynamoDB,S3 등의 스토리지에 저장할 수 있습니다.

아래의 예시는 간단하게 timestamp,user_id,click_id를 데이터 스트림에 적재하고 적재된 데이터는 실시간으로 Lambda에서 트리거하여 데이터를 CloudWatchLogs에서 보여주는 예시입니다.

전제 조건

  • Pycharm과 같은 Python IDE가 설치 되어 있어야합니다.
  • aws cli가 설치되어야 있어야 하며 자격증명시에는 아래의 권한이 IAM 사용자에 포함되어 있어야합니다.[1][2]
    • AmazonKinesisFullAccess

1단계 : KinesisDataStream 생성

  • [Amazon Kinesis],[데이터 스트림]에서 [데이터 스트림 생성]을 클릭합니다.
  • 데이터 스트림 이름에는 “DemoStream” 입력하고 프로비저닝을 클릭하여 샤드를 1개로 지정하여 데이터 스트림을 생성합니다.

Enter image description here

Step2. Lambda 실행시 필요한 IAM 역할 생성

  • IAM페이지에 접속하여 [역할],[역할 생성], [Lambda] 아래의 두 권한을 넣어줍니다.
    • CloudWatchFullAccess, AmazonKinesisReadOnlyAccess
  • 역할 세부정보에서 역할의 이름 “KinesisLambdaRole” 으로 지정하여 역할을 생성합니다.

Enter image description here

Step3. Lambda 실행시 필요한 IAM 역할 생성

  • Lambda 페이지에 접속하여 [함수],[함수 생성]을 클릭합니다.
  • 함수이름에는 DemoLambda을 입력하고 런타임의 경우 Python최신버전(3.11)을 선택합니다.
  • 기본 실행역할 변경에서 [기존 역할 사용],[KinesisLambdaRole]를 선택한뒤 함수를 생성합니다.

Step4. Lambda 함수 설정하기

  • [트리거 추가],[Kinesis] 이전에 생성한 [DemoStream]을 클릭한뒤 저장합니다.
  • Lambda 함수의 코드를 아래와 같이 수정합니다.
import json
import boto3
import base64


def lambda_handler(event, context):
    encoded_data=event['Records'][0]['kinesis']['data']
    decoded_data = base64.b64decode(encoded_data).decode('utf-8')
    
    json_data=json.loads(decoded_data)
   
    print(json_data)

Step5. KinesisDataStream에 데이터를 넣기

  • Python IDE에서 boto3를 pip를 이용하여 설치합니다.
  • 이후 아래의 코드를 입력하여 KinesisDataStream에 데이터를 넣습니다.
import json
import random
import boto3
from datetime import datetime

timestamp = datetime.now().strftime('%Y%m%d%H%M%S')

stream_name = 'DemoStream'
kinesis_client = boto3.client('kinesis')

def create_data():
    data = {
        "timestamp":timestamp,
        "used_id":"1",
        "click_id": random.randrange(1,10)
    }
    return data

def put_data_into_kinesis(data):
    data = json.dumps(data)
    partition_key = "DemoPartitionKey"
    response = kinesis_client.put_record(
        StreamName=stream_name,
        Data=data,
        PartitionKey=partition_key
    )

sample_data = create_data()
put_data_into_kinesis(sample_data)

Step6. CloudWatch Logs를 통해서 확인

  • Lambda함수에서 [모니터링]탭을 클릭한뒤 [CloudWatchLogs 보기]를 클릭합니다.
  • 위의 단계를 실행시켜 데이터를 적재한뒤 확인하면 아래처럼 DemoStream에 실시간으로 저장된 데이터를 확인할수있습니다.

Enter image description here

참고 자료

[1] AWS CLI 설치 및 업데이트 지침

https://docs.aws.amazon.com/ko_kr/cli/latest/userguide/getting-started-install.html#getting-started-install-instructions

[2] 구성 설정 지정 및 보기

https://docs.aws.amazon.com/ko_kr/cli/latest/userguide/cli-configure-files.html#cli-configure-files-methods