Neste artigo, vamos explorar como conectar ao Azure Table Storage, realizar consultas para extrair dados específicos e, em seguida, utilizar o PySpark para manipular e exibir esses dados em um formato tabular. Este guia é ideal para desenvolvedores e engenheiros de dados que trabalham com grandes volumes de dados no Azure e querem aproveitar a flexibilidade do PySpark.
1. Configuração do Ambiente
Antes de começarmos, é necessário configurar algumas bibliotecas e informações de acesso. Neste exemplo, utilizamos as seguintes bibliotecas:
azure.data.tables: Para conectar e interagir com o Azure Table Storage.
pandas: Para manipulação de dados (caso necessário).
pyspark.sql: Para manipulação de dados utilizando PySpark.
2. Configurando as Credenciais de Acesso
Para acessar o Azure Table Storage, você precisará do nome do ambiente de acesso e da chave da conta, que são armazenados em variáveis para garantir a segurança:
account_name = "seu_ambiente_acesso"
account_key = "sua_chave"
table_name = "tabela_a_qual_deseja"
3. Conectando ao Azure Table Storage
O primeiro passo é estabelecer a conexão com o Azure Table Storage. Para isso, criamos uma connection_string e utilizamos o TableServiceClient para iniciar a conexão:
from azure.data.tables import TableServiceClient
from azure.data.tables import TableClient
def get_table_data():
connection_string = f"DefaultEndpointsProtocol=https;AccountName={account_name};AccountKey={account_key};TableEndpoint=https://{account_name}.table.core.windows.net/;"
table_service = TableServiceClient.from_connection_string(conn_str=connection_string)
table_client = table_service.get_table_client(table_name)
4. Filtrando e Selecionando Colunas
Uma vez conectado, podemos realizar consultas no Azure Table Storage. No exemplo abaixo, filtramos os dados para obter apenas as entidades que possuem PartitionKey igual a ‘2024-08-02’. Além disso, selecionamos colunas específicas para extrair apenas os dados que são relevantes para a nossa análise:
query_filter = "PartitionKey eq '2024-08-02'"
columns = ['PartitionKey','RowKey','client','depot','margin_max','max_comparable_price','min_comparable_price','product','validity_date','volume_tier']
entities = table_client.query_entities(query_filter, select=columns)
5. Convertendo Dados para DataFrame do PySpark
Após recuperar os dados do Azure Table Storage, convertemos as entidades em um DataFrame do PySpark. Isso é útil para manipular grandes volumes de dados de forma eficiente:
rows = [entity for entity in entities]
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AzureTable").getOrCreate()
df_spark = spark.createDataFrame(rows)
6. Exibindo os Dados
Finalmente, podemos exibir os dados no formato tabular utilizando o método show() do PySpark. Neste caso, optamos por mostrar até 40 registros sem truncar os valores das colunas:
df_spark.show(n=40, truncate=False)
return df_spark
# Chamando a função para obter os dados e exibir
get_table_data()
Source link
lol