본 기사에서는 콘솔을 접근하지 않고 Athena를 python에서 이용하는 방법과 조회된 테이블을 pandas의 데이터프레임으로 변경하는 방법에 대해 제공해드립니다.
Athena는 표준 SQL 을 사용하여 Amazon S3에 있는 데이터를 직접 간편하게 분석할 수 있는 대화형 쿼리 서비스입니다. AWS Management Console에서 몇 가지 작업을 수행하면 Athena에서 Amazon S3에 저장된 데이터를 지정하고 표준 SQL을 사용하여 임시 쿼리를 실행하여 몇 초 안에 결과를 얻을 수 있습니다. Athena 사용시 일반적으로 콘솔에 접근하여 사용을 합니다. 이 기사에서는 콘솔에 접근하지 않고 Python에서 간단히 Athena의 SDK를 이용하여 조회하고 조회된 데이터의 포맷을 바꾸는 방법을 알려드립니다.
전제조건
- Glue 를 이용하여 기존의 데이터베이스가 존재해야합니다.
- 아래와 같은 권한을 가진 사용자로 AWS CLI 자격증명을 해야합니다.
- AmazonAthenaFullAccess,AmazonS3FullAccess
- 본 기사에서는 ratings.csv 예시 데이터를 이용합니다.
Athena에서 Python SDK를 이용하여 SQL 쿼리를 사용하는 방법
- Python IDE(Pycharm, Visual Studio code)를 열어 아래의 코드를 입력합니다.
- 아래의 코드 및 쿼리를 자신의 환경에 맞추어 수정합니다.
- 아래의 예시는 각 영화의 평균 평점을 구하는 쿼리 예시입니다.
import boto3
athena_client=boto3.client('athena')
database_name = '<데이터 베이스 이름>'
table_name = '<테이블 이름>'
# 실행할 SQL 쿼리문
query= 'SELECT movieid,ROUND(avg(rating),1) as "average of rating" FROM "{}"."{}" group by movieid'.format(database_name,table_name)
response = athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database_name
},
ResultConfiguration={
'OutputLocation': '<결과가 저장될 S3 경로>'
}
)
query_execution_id = response['QueryExecutionId']
# 쿼리 실행 완료를 확인
while True:
query_status = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
if query_status['QueryExecution']['Status']['State'] in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
state=query_status['QueryExecution']['Status']['State']
break
if state == 'SUCCEEDED':
results = athena_client.get_query_results(QueryExecutionId=query_execution_id)
columns = [col['Name'] for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']]
data = [row['Data'] for row in results['ResultSet']['Rows']]
# 결과 출력
for row in data:
row_data = [item.get('VarCharValue', '') for item in row]
print(dict(zip(columns, row_data)))
else:
print(f"Query execution failed with state: {state}")
추가: 일부 코드를 수정하여 테이블을 데이터프레임으로 변환하는 방법
- 쿼리를 변경하여 SDK 에서 바로 분석가능
- 아래의 코드를 이용하여 Pandas 라이브러리의 DataFrame으로 변경하여 분석하는데 이용할 수 있습니다.
import pandas as pd
...
..
if state == 'SUCCEEDED':
results = athena_client.get_query_results(QueryExecutionId=query_execution_id)
columns = [col['Name'] for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']]
data = [row['Data'] for row in results['ResultSet']['Rows']]
movie_list,rating_lst=[],[]
for row in data:
row_data = [item.get('VarCharValue', '') for item in row]
movie_list.append(row_data[0])
rating_lst.append(row_data[1])
data_frame={columns[0]:movie_list,columns[1]:rating_lst}
df=pd.DataFrame(data_frame)
df.columns=df.iloc[0]
df=df[1:]
print(df)