How do I use a partitioned Amazon S3 access log to prevent an Athena query timeout?

4 minute read
0

When I run Athena queries for Amazon Simple Storage Service (Amazon S3) access logs, the query times out.

Short description

Because all Amazon S3 access logs are stored with the same prefix, Amazon Athena might not be able to read a large amount of data. To prevent this issue, Use an AWS Glue ETL job to partition your Amazon S3 data. Then, run Athena queries on limited partitions.

Resolution

Partition Amazon S3 data

Create the following table in Athena. Replace s3_access_logs_dbReplace with the name of your table. Replace s3://awsexamplebucket1-logs/prefix/ with the path that stores your Amazon S3 access logs: 

CREATE EXTERNAL TABLE `s3_access_logs_db.s3_access_logs`(  
  `bucketowner` string,
  `bucket_name` string,
  `requestdatetime` string,
  `remoteip` string,
  `requester` string,
  `requestid` string,
  `operation` string,
  `key` string,
  `request_uri` string,
  `httpstatus` string,
  `errorcode` string,
  `bytessent` string,
  `objectsize` string,
  `totaltime` string,
  `turnaround_time` string,
  `referrer` string,
  `useragent` string,
  `version_id` string,
  `hostid` string,
  `sigv` string,
  `ciphersuite` string,
  `authtype` string,
  `endpoint` string,
  `tlsversion` string,
  `accesspoint_arn` string,
  `aclrequired` string)
ROW FORMAT SERDE 
  'com.amazonaws.glue.serde.GrokSerDe' 
WITH SERDEPROPERTIES ( 
  'input.format'='%{NOTSPACE:bucketowner} %{NOTSPACE:bucket_name} \\[%{INSIDE_BRACKETS:requestdatetime}\\] %{NOTSPACE:remoteip} %{NOTSPACE:requester} %{NOTSPACE:requestid} %{NOTSPACE:operation} %{NOTSPACE:key} \"%{INSIDE_QS:request_uri}\" %{NOTSPACE:httpstatus} %{NOTSPACE:errorcode} %{NOTSPACE:bytes_sent} %{NOTSPACE:objectsize} %{NOTSPACE:totaltime} %{NOTSPACE:turnaround_time} \"?%{INSIDE_QS:referrer}\"? \"%{INSIDE_QS:useragent}\" %{NOTSPACE:version_id} %{NOTSPACE:hostid} %{NOTSPACE:sigv} %{NOTSPACE:ciphersuite} %{NOTSPACE:authtype} %{NOTSPACE:endpoint} %{NOTSPACE:tlsversion} %{NOTSPACE:accesspoint_arn} %{NOTSPACE:aclrequired}', 
  'input.grokCustomPatterns'='INSIDE_QS ([^\"]*)\nINSIDE_BRACKETS ([^\\]]*)') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://awsexamplebucket1-logs/prefix/';

Create a Glue ETL job

  1. Open the AWS Glue console.

  2. Choose ETL jobs. Then, choose Spark script editor.

  3. Choose Create.

  4. In the script tab, enter the following script. Replace s3_access_logs_db with the name of your database that you created. Replace s3_access_logs with the name of your table that you created.
    Replace s3://awsexamplebucket2-logs/prefix/ with the name of the Amazon S3 path where you store your access logs. Replace s3_access_logs_db with the name of your database. Replace s3_access_logs_partitioned with the name of your partitioned table:

    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from pyspark.sql.functions import split, col, size
    from awsglue.dynamicframe import DynamicFrame
    
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    dyf = glueContext.create_dynamic_frame.from_catalog(database='s3_access_logs_db', table_name='s3_access_logs', transformation_ctx = 'dyf',additional_options = {"attachFilename": "s3path"})
    
    df = dyf.toDF()
    df2=df.withColumn('filename',split(col("s3path"),"/"))\
            .withColumn('year',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(0))\
            .withColumn('month',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(1))\
            .withColumn('day',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(2))\
            .drop('s3path','filename')
    
    output_dyf = DynamicFrame.fromDF(df2, glue_ctx=glueContext, name = 'output_dyf')
    
    partitionKeys = ['year', 'month', 'day']
    sink = glueContext.getSink(connection_type="s3", path='s3://awsexamplebucket2-logs/prefix/',
                               enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE",
                               partitionKeys=partitionKeys)
    sink.setFormat("glueparquet")
    sink.setCatalogInfo(catalogDatabase='s3_access_logs_db', catalogTableName='s3_access_logs_partitioned')
    sink.writeFrame(output_dyf)
    
    job.commit()
  5. In the Job details tab, enter the Name of your job, and then choose IAM Role.

  6. Choose Save, and then choose Run.

Note: Amazon S3 access logs are delivered regularly. To set time-based schedules for the Glue ETL job, add a trigger. Also, turn on job bookmarks.

Create a DynamicFrame object and a partitioned table

After you create the Amazon S3 access log table in Athena, create a DynamicFrame object. This object contains the Amazon S3 access logs. Based on this object, a partitioned table with keys to indicate year, month, and day is created.

  1. To create a DynamicFrame object and scan the S3 access logs table, run the following command. The attachFilename parameter is used as the column name. Replace s3_access_logs_db with the name of your database. Replace s3_access_logs_partitioned with the name of your partitioned table:

     dyf = glueContext.create_dynamic_frame.from_catalog(database='s3_access_logs_db, table_name='s3_access_logs', transformation_ctx = 'dyf',additional_options = {"attachFilename": "s3path"})
  2. To create year, month, and day columns from the path to your Amazon S3 access logs, run the following command:

    df = dyf.toDF()
    df2=df.withColumn('filename',split(col("s3path"),"/"))\
            .withColumn('year',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(0))\
            .withColumn('month',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(1))\
            .withColumn('day',split(col("filename").getItem(size(col("filename"))-1),"-").getItem(2))\
            .drop('s3path','filename')
  3. To create a partitioned table for S3 access logs, run the following command. Replace s3://awsexamplebucket2-logs/prefix/ with the name of the Amazon S3 path to the access logs. Replace s3_access_logs_db with the name of your database. Replace s3_access_logs_partitioned with the name of your partitioned table:

    output_dyf = DynamicFrame.fromDF(df2, glue_ctx=glueContext, name = 'output_dyf')
    
    partitionKeys = ['year', 'month', 'day']
    sink = glueContext.getSink(connection_type="s3", path='s3://awsexamplebucket2-logs/prefix/',
                               enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE",
                               partitionKeys=partitionKeys)
    sink.setFormat("glueparquet")
    sink.setCatalogInfo(catalogDatabase='s3_access_logs_db', catalogTableName='s3_access_logs_partitioned')
    sink.writeFrame(output_dyf)

Query the partitioned table

  1. Open the Athena console.
  2. To query the table and confirm that the table is partitioned, run the following command. Replace s3_access_logs_db with the name of your database. Replace s3_access_logs_partitioned with the name of your partitioned table. Replace 2023, 03, and 04 with your partition values:
    SELECT * FROM "s3_access_logs_db"."s3_access_logs_partitioned" WHERE year = '2023' AND month = '03' AND day = '04'
AWS OFFICIAL
AWS OFFICIALUpdated 6 months ago