Athena에서 AWS WAF 로그를 분석하려면 어떻게 해야 합니까?
Amazon Athena에서 AWS WAF 로그를 쿼리하려고 합니다.
해결 방법
Athena에서 AWS WAF 로그를 쿼리하려면 Amazon Simple Storage Service(Amazon S3)에서 데이터베이스 및 테이블 스키마를 생성합니다. 그런 다음, 샘플 쿼리를 사용하여 로그에서 필요한 정보를 가져옵니다.
Amazon S3에 데이터베이스 및 테이블 생성
-
Amazon S3 버킷에 대한 웹 액세스 제어 목록(웹 ACL) 로깅을 켭니다. Target bucket(대상 버킷) 및 **Target prefix(대상 접두사)**의 값을 텍스트 파일에 복사하여 테이블 스키마에서 사용합니다. 이 값은 Athena 쿼리에서 Amazon S3 위치를 지정합니다.
-
Athena 콘솔을 엽니다.
참고: 첫 번째 쿼리를 실행하기 전에 쿼리 결과 위치에 대한 S3 버킷을 생성하십시오. -
쿼리 편집기에서 CREATE DATABASE를 실행하여 데이터베이스를 생성합니다.
CREATE DATABASE waf_logs_db
참고: Amazon S3 버킷과 동일한 AWS 리전에 데이터베이스를 생성하는 것이 가장 좋습니다.
-
Athena에서 AWS WAF 로그에 대한 테이블 스키마를 생성합니다. 다음 예는 파티션 프로젝션이 있는 테이블 템플릿 쿼리입니다.
CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array < struct < rulegroupid: string, terminatingrule: struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > >, nonterminatingmatchingrules: array < struct < ruleid: string, action: string, overriddenaction: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > > >, excludedrules: string > >, `ratebasedrulelist` array < struct < ratebasedruleid: string, limitkey: string, maxrateallowed: int > >, `nonterminatingmatchingrules` array < struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, `requestheadersinserted` array < struct < name: string, value: string > >, `responsecodesent` string, `httprequest` struct < clientip: string, country: string, headers: array < struct < name: string, value: string > >, uri: string, args: string, httpversion: string, httpmethod: string, requestid: string >, `labels` array < struct < name: string > >, `captcharesponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `challengeresponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `ja3Fingerprint` string ) PARTITIONED BY ( `region` string, `date` string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/AWSLogs/accountID/WAFLogs/region/DOC-EXAMPLE-WEBACL/' TBLPROPERTIES( 'projection.enabled' = 'true', 'projection.region.type' = 'enum', 'projection.region.values' = 'us-east-1,us-west-2,eu-central-1,eu-west-1', 'projection.date.type' = 'date', 'projection.date.range' = '2021/01/01,NOW', 'projection.date.format' = 'yyyy/MM/dd', 'projection.date.interval' = '1', 'projection.date.interval.unit' = 'DAYS', 'storage.location.template' = 's3://DOC-EXAMPLE-BUCKET/AWSLogs/accountID/WAFLogs/${region}/DOC-EXAMPLE-WEBACL/${date}/')
참고: storage.location.template, projection.region.values, projection.date.range, DOC-EXAMPLE-BUCKET 및 DOC-EXAMPLE-WEBACL을 원하는 값으로 바꾸십시오.
다음 예는 파티션 프로젝션이 없는 테이블 템플릿 쿼리입니다.
CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array < struct < rulegroupid: string, terminatingrule: struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > >, nonterminatingmatchingrules: array < struct < ruleid: string, action: string, overriddenaction: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > > >, excludedrules: string > >, `ratebasedrulelist` array < struct < ratebasedruleid: string, limitkey: string, maxrateallowed: int > >, `nonterminatingmatchingrules` array < struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, `requestheadersinserted` array < struct < name: string, value: string > >, `responsecodesent` string, `httprequest` struct < clientip: string, country: string, headers: array < struct < name: string, value: string > >, uri: string, args: string, httpversion: string, httpmethod: string, requestid: string >, `labels` array < struct < name: string > >, `captcharesponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `challengeresponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `ja3Fingerprint` string ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/'
참고: DOC-EXAMPLE-BUCKET을 S3 버킷 이름으로 바꾸십시오.
-
탐색 창의 **Tables(테이블)**에서 **Preview table(테이블 미리 보기)**을 선택합니다. formatversion, webaclid, httpsourcename, ja3Fingerprint와 같은 AWS WAF 데이터가 테이블에 있는지 확인합니다.
Athena에서 AWS WAF 로그 분석
AWS WAF 로그 파일을 분석하려면 다음 예제 쿼리를 사용합니다. 쿼리를 직접 만들 수도 있습니다.
지난 10일 동안 제외된 규칙과 일치하는 IP 주소 계산
다음 명령을 실행합니다.
WITH test_dataset AS (SELECT * FROM waf_logs CROSS JOIN UNNEST(rulegrouplist) AS t(allrulegroups)) SELECT COUNT(*) AS count, "httprequest"."clientip", "allrulegroups"."excludedrules", "allrulegroups"."ruleGroupId" FROM test_dataset WHERE allrulegroups.excludedrules IS NOT NULL AND from_unixtime(timestamp/1000) > now() - interval '10' day GROUP BY "httprequest"."clientip", "allrulegroups"."ruleGroupId", "allrulegroups"."excludedrules" ORDER BY count DESC
참고: 10을 원하는 시간 프레임으로 바꾸십시오.
지정된 날짜 범위 및 IP 주소에 대한 레코드 반환
다음 명령을 실행합니다.
SELECT * FROM waf_logs WHERE httprequest.clientip='192.168.0.0' AND "date" >= '2022/03/01' AND "date" < '2022/03/31'
192.168.0.0을 IP 주소로 바꾸고 2022/03/01 및 2022/03/31을 원하는 날짜로 바꿉니다.
요청이 차단된 횟수를 특정 속성별로 그룹화하여 계산
다음 명령을 실행합니다.
SELECT COUNT(*) AS count, webaclid, terminatingruleid, httprequest.clientip, httprequest.uri FROM waf_logs WHERE action='BLOCK' GROUP BY webaclid, terminatingruleid, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;
참고: webaclid, terminatingruleid, httprequest.clientip 및 httprequest.uri를 원하는 값으로 바꾸고 100을 원하는 최대 결과 수로 바꾸십시오.
계산된 모든 사용자 지정 규칙을 일치된 횟수별로 그룹화
다음 명령을 실행합니다.
SELECT count(*) AS count, httpsourceid, httprequest.clientip, t.ruleid, t.action FROM "waf_logs" CROSS JOIN UNNEST(nonterminatingmatchingrules) AS t(t) WHERE action <> 'BLOCK' AND cardinality(nonTerminatingMatchingRules) > 0 GROUP BY t.ruleid, t.action, httpsourceid, httprequest.clientip ORDER BY "count" DESC Limit 50
필터 IP 주소로 쿼리 실행
다음 명령을 실행합니다.
SELECT * FROM "waf_logs_db"."waf_logs" where httprequest.clientip='192.168.0.0' limit 10;
참고: 192.168.0.0을 IP 주소로 바꾸고 10을 원하는 최대 결과 수로 바꾸십시오.
요청에 원래 헤더, 브라우저 사용자 에이전트 문자열 또는 쿠키가 없는 날짜스탬프 선택
다음 명령을 실행합니다.
SELECT datestamp, element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'origin'), 1).value IS NULL AS MissingOrigin, httprequest.clientip, element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'user-agent'), 1).value AS UserAgent, element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'cookie'), 1).value AS Cookie from "waf_logs_db"."waf_logs" where webaclname = 'production-web' AND datestamp >= '2021/01/01' AND httprequest.uri = '/uri/path' AND httprequest.httpmethod = 'POST' order by 1 desc
참고: production-web을 웹 ACL로 바꾸고 2021/01/01을 원하는 날짜로 바꾸십시오.
특정 열을 기준으로 레코드 수 계산 및 정렬
다음 예제 쿼리에서는 User-Agent 및 URI path(URI 경로) 열을 기반으로 레코드 수를 계산하고 레코드를 정렬합니다. 또한 특정 HTTP 메서드와 원래 요청 헤더가 있는 요청을 제외시킵니다.
다음 명령을 실행합니다.
SELECT count() AS Count, element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'user-agent'), 1).value AS useragent, httprequest.uri from "db"."waf_logs" where webaclname = 'production-web' AND httprequest.httpmethod != 'GET' AND httprequest.httpmethod != 'HEAD' AND element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'origin'), 1).value IS NULL AND datestamp >= '2021/01/01' group by 2,3 ORDER BY 1 desc
참고: production-web을 웹 ACL로 바꾸고 2021/01/01을 원하는 날짜로 바꾸십시오.
자세한 내용은 AWS WAF 로그 쿼리를 참조하십시오.
관련 정보
관련 콘텐츠
- 질문됨 일 년 전lg...
- 질문됨 6년 전lg...
- AWS 공식업데이트됨 3달 전
- AWS 공식업데이트됨 6달 전