Como uso dados dinâmicos após uma transformação relacional do AWS Glue?
Quero usar a transformação de relacionalização do AWS Glue para nivelar meus dados. Quais campos posso usar como partições para armazenar os dados dinâmicos no Amazon Simple Storage Service (Amazon S3)?
Descrição breve
A transformação de relacionalização possibilita o uso de estruturas de dados NoSQL, como matrizes e estruturas, em bancos de dados relacionais. A transformação de relacionalização retorna uma coleção de DynamicFrames (uma DynamicFrameCollection em Python e uma matriz em Scala). Todos os DynamicFrames retornados por uma transformação de relacionalização podem ser acessados por meio de seus nomes individuais em Python e por meio de índices de matriz em Scala.
Resolução
Relacionalizar os dados
Este tutorial usa o seguinte esquema:
|-- family_name: string |-- name: string |-- gender: string |-- image: string |-- images: array | |-- element: struct | | |-- url: string
Use a seguinte sintaxe de relacionalização para Python:
# AWS Glue Data Catalog: database and table names db_name = "us-legislators" tempDir = "s3://awsexamplebucket/temp_dir/" # Create dynamic frames from the source tables persons = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_persons) # Relationalize transformation dfc = persons.relationalize("root", tempDir) dfc.select('root_images').printSchema() dfc.select('root_images').show()
Use a seguinte sintaxe de relacionalização para Scala:
// AWS Glue Data Catalog: database and table names val dbName = "us-legislators" val tblPersons = "persons_json" // Output Amazon S3 temp directory val tempDir = "s3://awsexamplebucket/temp_dir" val persons: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblPersons).getDynamicFrame() val personRelationalize = persons.relationalize(rootTableName = "root", stagingPath = tempDir) personRelationalize(2).printSchema() personRelationalize(2).show()
Interpretar os dados dinâmicos
Essa transformação de relacionalização produz dois esquemas: root e root_images.
root:
|-- family_name: string |-- name: string |-- gender: string |-- image: string |-- images: long
root_images:
|-- id: long |-- index: int |-- images.val.url: string
- id: ordem do elemento da matriz (1, 2 ou 3)
- index: posição do índice para cada elemento em uma matriz
- images.val.url: valor para images.val.url em root_images
Esses são os únicos campos que podem ser usados como campos de partição para armazenar esses dados dinâmicos no Amazon S3. Especificar campos da tabela root, como name, não funciona porque esses campos não existem em root_images.
Juntar os dados relacionalizados para obter os dados normalizados
O atributo id em root_images é a ordem das matrizes (1, 2 ou 3) no conjunto de dados. O atributo images em root contém o valor do índice de matrizes. Isso significa que você deve usar images e id para unir root e root_images. É possível executar dynamicFrame.show() para verificar a ordem das matrizes e o valor do índice de matrizes.
Para unir root e root_images:
Python:
joined_root_root_images = Join.apply(dfc.select('root'), dfc.select('root_images'), 'images', 'id')
Scala:
val joined_root_root_images = personRelationalize(0).join(keys1 = Seq("images"), keys2 = Seq("id"), frame2 = personRelationalize(1))
Armazenar os dados dinâmicos
Para armazenar os dados dinâmicos no Amazon S3 com partições:
Python:
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir,"partitionKeys":["id"]}, format = "csv",transformation_ctx = "datasink4")
Scala:
**Observação:**no exemplo a seguir, personRelationalize(2) é a tabela de dados dinâmica root_images.
glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> paths, "partitionKeys" -> List("id"))), format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))
Para armazenar os dados dinâmicos no Amazon S3 sem partições:
Python:
datasink5 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir}, format = "csv",transformation_ctx = "datasink5"
Scala:
**Observação:**no exemplo a seguir, personRelationalize(2) é a tabela de dados dinâmica root_images.
glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> paths)), format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))
Depois de gravar os dados no Amazon S3, consulte-os no Amazon Athena ou use um DynamicFrame para gravá-los em um banco de dados relacional, como o Amazon Redshift.
Informações relacionadas
Simplificar consultas a JSON aninhado com a transformação de relacionalização do AWS Glue
Conteúdo relevante
- AWS OFICIALAtualizada há 2 anos
- AWS OFICIALAtualizada há 2 meses
- AWS OFICIALAtualizada há um mês