When I run Amazon Athena queries for Amazon Simple Storage Service (Amazon S3) access logs, the query times out. I want to troubleshoot this issue.
Resolution
Amazon S3 access logs are stored with the same prefix. If there's a large amount of data, then Athena queries might time out before Athena reads all the data. To prevent this issue, use an AWS Glue ETL job to partition your Amazon S3 data. Then, run Athena queries on limited partitions.
Note: For the example table, script, and commands, replace the following values with your values where necessary:
- s3_access_logs_db with the name of your database
- s3://awsexamplebucket1-logs/prefix/ with the path that stores your Amazon S3 access logs
- s3_access_logs with the name of your table
- s3_access_logs_partitioned with the name of your partitioned table
- 2023, 03, and 04 with your partition values
Partition Amazon S3 data
Create the following table in Athena:
CREATE EXTERNAL TABLE `s3_access_logs_db.s3_access_logs`(
`bucketowner` string,
`bucket_name` string,
`requestdatetime` string,
`remoteip` string,
`requester` string,
`requestid` string,
`operation` string,
`key` string,
`request_uri` string,
`httpstatus` string,
`errorcode` string,
`bytessent` string,
`objectsize` string,
`totaltime` string,
`turnaround_time` string,
`referrer` string,
`useragent` string,
`version_id` string,
`hostid` string,
`sigv` string,
`ciphersuite` string,
`authtype` string,
`endpoint` string,
`tlsversion` string,
`accesspoint_arn` string,
`aclrequired` string)
ROW FORMAT SERDE
'com.amazonaws.glue.serde.GrokSerDe'
WITH SERDEPROPERTIES (
'input.format'='%{NOTSPACE:bucketowner} %{NOTSPACE:bucket_name} \\[%{INSIDE_BRACKETS:requestdatetime}\\] %{NOTSPACE:remoteip} %{NOTSPACE:requester} %{NOTSPACE:requestid} %{NOTSPACE:operation} %{NOTSPACE:key} \"%{INSIDE_QS:request_uri}\" %{NOTSPACE:httpstatus} %{NOTSPACE:errorcode} %{NOTSPACE:bytes_sent} %{NOTSPACE:objectsize} %{NOTSPACE:totaltime} %{NOTSPACE:turnaround_time} \"?%{INSIDE_QS:referrer}\"? \"%{INSIDE_QS:useragent}\" %{NOTSPACE:version_id} %{NOTSPACE:hostid} %{NOTSPACE:sigv} %{NOTSPACE:ciphersuite} %{NOTSPACE:authtype} %{NOTSPACE:endpoint} %{NOTSPACE:tlsversion}( %{NOTSPACE:accesspoint_arn} %{NOTSPACE:aclrequired})?',
'input.grokCustomPatterns'='INSIDE_QS ([^\"]*)\nINSIDE_BRACKETS ([^\\]]*)')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://awsexamplebucket1-logs/prefix/';
Create an AWS Glue ETL job
Complete the following steps:
-
Open the AWS Glue console.
-
Choose ETL jobs, and then choose Spark script editor.
-
Choose Create.
-
On the script tab, enter the following script:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import split, col, size
from awsglue.dynamicframe import DynamicFrame
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
dyf = glueContext.create_dynamic_frame.from_catalog(database='s3_access_logs_db', table_name='s3_access_logs', transformation_ctx = 'dyf',additional_options = {"attachFilename": "s3path"})
df = dyf.toDF()
df2=df.withColumn('filename',split(col("s3path"),"/"))\
.withColumn('year',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(0))\
.withColumn('month',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(1))\
.withColumn('day',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(2))\
.drop('s3path','filename')
output_dyf = DynamicFrame.fromDF(df2, glue_ctx=glueContext, name = 'output_dyf')
partitionKeys = ['year', 'month', 'day']
sink = glueContext.getSink(connection_type="s3", path='s3://awsexamplebucket2-logs/prefix/',
enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE",
partitionKeys=partitionKeys)
sink.setFormat("glueparquet")
sink.setCatalogInfo(catalogDatabase='s3_access_logs_db', catalogTableName='s3_access_logs_partitioned')
sink.writeFrame(output_dyf)
job.commit()
-
On the Job details tab, enter the name of your job, and then choose IAM role.
-
Choose Save, and then choose Run.
Note: Amazon S3 access logs are regularly delivered. To set time-based schedules for the AWS Glue ETL job, add a trigger. Also, turn on job bookmarks.
Create a DynamicFrame object and a partitioned table
After you create the Amazon S3 access log table, create a DynamicFrame object that contains the Amazon S3 access logs. You can then create a partitioned table with keys to show the year, month, and day based on the DynamicFrame object.
Complete the following steps:
-
Run the following command to create a DynamicFrame object and scan the Amazon S3 access logs table:
dyf = glueContext.create_dynamic_frame.from_catalog(database='s3_access_logs_db, table_name='s3_access_logs', transformation_ctx = 'dyf',additional_options = {"attachFilename": "s3path"})
Note: The attachFilename parameter is used as the column name.
-
Run the following command to create year, month, and day columns from the path to your Amazon S3 access logs:
df = dyf.toDF()df2=df.withColumn('filename',split(col("s3path"),"/"))\
.withColumn('year',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(0))\
.withColumn('month',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(1))\
.withColumn('day',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(2))\
.drop('s3path','filename')
-
Run the following command to create a partitioned table for the Amazon S3 access logs:
output_dyf = DynamicFrame.fromDF(df2, glue_ctx=glueContext, name = 'output_dyf')
partitionKeys = ['year', 'month', 'day']
sink = glueContext.getSink(connection_type="s3", path='s3://awsexamplebucket2-logs/prefix/',
enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE",
partitionKeys=partitionKeys)
sink.setFormat("glueparquet")
sink.setCatalogInfo(catalogDatabase='s3_access_logs_db', catalogTableName='s3_access_logs_partitioned')
sink.writeFrame(output_dyf)
Query the partitioned table
- Open the Athena console.
- Run the following command to query the table and confirm that the table is partitioned:
SELECT * FROM "s3_access_logs_db"."s3_access_logs_partitioned" WHERE year = '2023' AND month = '03' AND day = '04'