AWS Athena를 Python SDK로 이용하는 방법과 조회된 테이블을 Pandas 데이터프레임으로 변환하는 방법

2분 분량
콘텐츠 수준: 중급
0

본 기사에서는 콘솔을 접근하지 않고 Athena를 python에서 이용하는 방법과 조회된 테이블을 pandas의 데이터프레임으로 변경하는 방법에 대해 제공해드립니다.

Athena는 표준 SQL 을 사용하여 Amazon S3에 있는 데이터를 직접 간편하게 분석할 수 있는 대화형 쿼리 서비스입니다. AWS Management Console에서 몇 가지 작업을 수행하면 Athena에서 Amazon S3에 저장된 데이터를 지정하고 표준 SQL을 사용하여 임시 쿼리를 실행하여 몇 초 안에 결과를 얻을 수 있습니다. Athena 사용시 일반적으로 콘솔에 접근하여 사용을 합니다. 이 기사에서는 콘솔에 접근하지 않고 Python에서 간단히 Athena의 SDK를 이용하여 조회하고 조회된 데이터의 포맷을 바꾸는 방법을 알려드립니다.

전제조건

  • Glue 를 이용하여 기존의 데이터베이스가 존재해야합니다.
  • 아래와 같은 권한을 가진 사용자로 AWS CLI 자격증명을 해야합니다.
    • AmazonAthenaFullAccess,AmazonS3FullAccess
  • 본 기사에서는 ratings.csv 예시 데이터를 이용합니다.

Athena에서 Python SDK를 이용하여 SQL 쿼리를 사용하는 방법

  • Python IDE(Pycharm, Visual Studio code)를 열어 아래의 코드를 입력합니다.
  • 아래의 코드 및 쿼리를 자신의 환경에 맞추어 수정합니다.
    • 아래의 예시는 각 영화의 평균 평점을 구하는 쿼리 예시입니다.
import boto3

athena_client=boto3.client('athena')

database_name = '<데이터 베이스 이름>'
table_name = '<테이블 이름>'

# 실행할 SQL 쿼리문 
query= 'SELECT movieid,ROUND(avg(rating),1) as "average of rating" FROM "{}"."{}" group by movieid'.format(database_name,table_name)
response = athena_client.start_query_execution(
    QueryString=query,
    QueryExecutionContext={
        'Database': database_name
    },
    ResultConfiguration={
        'OutputLocation': '<결과가 저장될 S3 경로>'
    }
)
query_execution_id = response['QueryExecutionId']

# 쿼리 실행 완료를 확인
while True:
    query_status = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
    if query_status['QueryExecution']['Status']['State'] in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
        state=query_status['QueryExecution']['Status']['State']
        break

if state == 'SUCCEEDED':
    results = athena_client.get_query_results(QueryExecutionId=query_execution_id)
    columns = [col['Name'] for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']]
    data = [row['Data'] for row in results['ResultSet']['Rows']]
    
    # 결과 출력
    for row in data:
        row_data = [item.get('VarCharValue', '') for item in row]
        print(dict(zip(columns, row_data)))
else:
    print(f"Query execution failed with state: {state}")

Enter image description here

추가: 일부 코드를 수정하여 테이블을 데이터프레임으로 변환하는 방법

  • 쿼리를 변경하여 SDK 에서 바로 분석가능
  • 아래의 코드를 이용하여 Pandas 라이브러리의 DataFrame으로 변경하여 분석하는데 이용할 수 있습니다.
import pandas as pd

...
..

if state == 'SUCCEEDED':
    results = athena_client.get_query_results(QueryExecutionId=query_execution_id)
    columns = [col['Name'] for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']]
    data = [row['Data'] for row in results['ResultSet']['Rows']]
    movie_list,rating_lst=[],[]
    for row in data:
        row_data = [item.get('VarCharValue', '') for item in row]
        movie_list.append(row_data[0])
        rating_lst.append(row_data[1])
    data_frame={columns[0]:movie_list,columns[1]:rating_lst}
    df=pd.DataFrame(data_frame)
    df.columns=df.iloc[0]
    df=df[1:]
    print(df)

Enter image description here

profile pictureAWS
지원 엔지니어
게시됨 일 년 전567회 조회