Comment puis-je analyser les journaux AWS WAF dans Athena ?
Je souhaite interroger les journaux AWS WAF dans Amazon Athena.
Résolution
Pour interroger les journaux AWS WAF dans Athena, créez une base de données et un schéma de table dans Amazon Simple Storage Service (Amazon S3). Puis, utilisez les exemples de requêtes pour obtenir les informations requises à partir de vos journaux.
Créer une base de données et une table dans Amazon S3
-
Activez la journalisation de la liste de contrôle d'accès Web (liste ACL Web) pour votre compartiment Amazon S3. Copiez les valeurs de Compartiment cible et Préfixe cible dans un fichier texte à utiliser dans le schéma de table. Ces valeurs indiquent l'emplacement Amazon S3 dans votre requête Athena.
-
Ouvrez la console Athena.
Remarque : Avant d'exécuter votre première requête, créez un compartiment S3 pour l'emplacement des résultats de votre requête. -
Dans l'Éditeur de requête, exécutez CREATE DATABASE pour créer une base de données :
CREATE DATABASE waf_logs_db
Remarque : Il est recommandé de créer la base de données dans la même région AWS que celle du compartiment S3.
-
Créez un schéma de table pour les journaux AWS WAF dans Athena. L'exemple suivant est une requête de modèle de table avec projection de partition :
CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array < struct < rulegroupid: string, terminatingrule: struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > >, nonterminatingmatchingrules: array < struct < ruleid: string, action: string, overriddenaction: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > > >, excludedrules: string > >, `ratebasedrulelist` array < struct < ratebasedruleid: string, limitkey: string, maxrateallowed: int > >, `nonterminatingmatchingrules` array < struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, `requestheadersinserted` array < struct < name: string, value: string > >, `responsecodesent` string, `httprequest` struct < clientip: string, country: string, headers: array < struct < name: string, value: string > >, uri: string, args: string, httpversion: string, httpmethod: string, requestid: string >, `labels` array < struct < name: string > >, `captcharesponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `challengeresponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `ja3Fingerprint` string ) PARTITIONED BY ( `region` string, `date` string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/AWSLogs/accountID/WAFLogs/region/DOC-EXAMPLE-WEBACL/' TBLPROPERTIES( 'projection.enabled' = 'true', 'projection.region.type' = 'enum', 'projection.region.values' = 'us-east-1,us-west-2,eu-central-1,eu-west-1', 'projection.date.type' = 'date', 'projection.date.range' = '2021/01/01,NOW', 'projection.date.format' = 'yyyy/MM/dd', 'projection.date.interval' = '1', 'projection.date.interval.unit' = 'DAYS', 'storage.location.template' = 's3://DOC-EXAMPLE-BUCKET/AWSLogs/accountID/WAFLogs/${region}/DOC-EXAMPLE-WEBACL/${date}/')
Remarque : Remplacez storage.location.template, projection.region.values, projection.date.range, DOC-EXAMPLE-BUCKET et DOC-EXAMPLE-WEBACL par vos valeurs.
L'exemple suivant est une requête de modèle de table sans projection de partition :
CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array < struct < rulegroupid: string, terminatingrule: struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > >, nonterminatingmatchingrules: array < struct < ruleid: string, action: string, overriddenaction: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > > >, excludedrules: string > >, `ratebasedrulelist` array < struct < ratebasedruleid: string, limitkey: string, maxrateallowed: int > >, `nonterminatingmatchingrules` array < struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, `requestheadersinserted` array < struct < name: string, value: string > >, `responsecodesent` string, `httprequest` struct < clientip: string, country: string, headers: array < struct < name: string, value: string > >, uri: string, args: string, httpversion: string, httpmethod: string, requestid: string >, `labels` array < struct < name: string > >, `captcharesponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `challengeresponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `ja3Fingerprint` string ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/'
Remarque : Remplacez DOC-EXAMPLE-BUCKET par le nom de votre compartiment S3.
-
Dans le volet de navigation, sous Tables, sélectionnez Aperçu de la table. Vérifiez que les données AWS WAF, telles que formatversion, webaclid, httpsourcename et ja3Fingerprint figurent dans la table.
Analyser vos journaux AWS WAF dans Athena
Pour analyser vos fichiers journaux AWS WAF, utilisez les exemples de requêtes suivants. Vous pouvez également créer vos propres requêtes.
Compter les adresses IP correspondantes qui s’alignent sur les règles exclues au cours des 10 derniers jours
Exécutez la commande suivante :
WITH test_dataset AS (SELECT * FROM waf_logs CROSS JOIN UNNEST(rulegrouplist) AS t(allrulegroups)) SELECT COUNT(*) AS count, "httprequest"."clientip", "allrulegroups"."excludedrules", "allrulegroups"."ruleGroupId" FROM test_dataset WHERE allrulegroups.excludedrules IS NOT NULL AND from_unixtime(timestamp/1000) > now() - interval '10' day GROUP BY "httprequest"."clientip", "allrulegroups"."ruleGroupId", "allrulegroups"."excludedrules" ORDER BY count DESC
Remarque : Remplacez 10 par votre trame.
Renvoyer des enregistrements pour une plage de dates et une adresse IP données
Exécutez la commande suivante :
SELECT * FROM waf_logs WHERE httprequest.clientip='192.168.0.0' AND "date" >= '2022/03/01' AND "date" < '2022/03/31'
Remplacez 192.168.0.0 par votre adresse IP et 2022/03/01 et 2022/03/31 par vos dates.
Compter le nombre de fois qu'une requête a été bloquée, groupée par attributs spécifiques
Exécutez la commande suivante :
SELECT COUNT(*) AS count, webaclid, terminatingruleid, httprequest.clientip, httprequest.uri FROM waf_logs WHERE action='BLOCK' GROUP BY webaclid, terminatingruleid, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;
Remarque : Remplacez webaclid, terminatingruleid, httprequest.clientip et httprequest.uri par vos valeurs et 100 par le nombre maximal de résultats souhaités.
Regrouper toutes les règles personnalisées comptabilisées par nombre de fois qu'elles ont été mises en correspondance
Exécutez la commande suivante :
SELECT count(*) AS count, httpsourceid, httprequest.clientip, t.ruleid, t.action FROM "waf_logs" CROSS JOIN UNNEST(nonterminatingmatchingrules) AS t(t) WHERE action <> 'BLOCK' AND cardinality(nonTerminatingMatchingRules) > 0 GROUP BY t.ruleid, t.action, httpsourceid, httprequest.clientip ORDER BY "count" DESC Limit 50
Exécuter une requête avec une adresse IP de filtre
Exécutez la commande suivante :
SELECT * FROM "waf_logs_db"."waf_logs" where httprequest.clientip='192.168.0.0' limit 10;
Remarque : Remplacez 192.168.0.0 par votre adresse IP et 10 par le nombre maximal de résultats souhaité.
Sélectionner un horodatage où la requête ne comporte pas d'en-tête d'origine, de chaîne d'agent utilisateur de navigateur ou de cookies
Exécutez la commande suivante :
SELECT datestamp, element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'origin'), 1).value IS NULL AS MissingOrigin, httprequest.clientip, element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'user-agent'), 1).value AS UserAgent, element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'cookie'), 1).value AS Cookie from "waf_logs_db"."waf_logs" where webaclname = 'production-web' AND datestamp >= '2021/01/01' AND httprequest.uri = '/uri/path' AND httprequest.httpmethod = 'POST' order by 1 desc
Remarque : Remplacez production-web par votre liste ACL Web et 2021/01/01 par votre date.
Compter et trier les enregistrements selon une colonne spécifique
L'exemple de requête suivant compte et trie les enregistrements en fonction des colonnes User-Agent et Chemin URI. Elle exclut également les méthodes et requêtes HTTP spécifiques avec un en-tête de requête d'origine.
Exécutez la commande suivante :
SELECT count() AS Count, element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'user-agent'), 1).value AS useragent, httprequest.uri from "db"."waf_logs" where webaclname = 'production-web' AND httprequest.httpmethod != 'GET' AND httprequest.httpmethod != 'HEAD' AND element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'origin'), 1).value IS NULL AND datestamp >= '2021/01/01' group by 2,3 ORDER BY 1 desc
Remarque : Remplacez production-web par votre liste ACL Web et 2021/01/01 par votre date.
Pour plus d'informations, reportez-vous à la section Interrogation des journaux AWS WAF.
Informations connexes
Étape 1 : Créer une base de données
Résolution de problèmes dans Athena
Comment puis-je utiliser Amazon Athena pour analyser les journaux d’accès à mon serveur Amazon S3 ?
Contenus pertinents
- demandé il y a un moislg...
- Réponse acceptéedemandé il y a un anlg...
- demandé il y a un anlg...
- demandé il y a 22 jourslg...
- demandé il y a un anlg...
- AWS OFFICIELA mis à jour il y a 4 ans
- AWS OFFICIELA mis à jour il y a 4 mois
- AWS OFFICIELA mis à jour il y a 9 mois
- AWS OFFICIELA mis à jour il y a 8 mois