Em uma postagem anterior, forneci uma introdução ao Apache Iceberg e mostrei como ele usa MinIO para armazenamento. Também mostrei como configurar uma máquina de desenvolvimento. Para fazer isso, usei o Docker Compose para instalar um contêiner Apache Spark como mecanismo de processamento, um catálogo REST e MinIO para armazenamento. Concluí com um exemplo muito simples que usou Apache Spark para ingerir dados e PyIceberg para consultar os dados. Se você é novo no Apache Iceberg ou se precisa configurar o Apache Iceberg em sua máquina de desenvolvimento, leia esta postagem introdutória .
Nesta postagem, continuarei de onde parei minha postagem anterior e investigarei um problema comum de big data: a necessidade de uma solução única para fornecer armazenamento para dados brutos, dados não estruturados e dados estruturados (dados que foram selecionados a partir de dados brutos). dados). Além disso, a mesma solução deve fornecer um mecanismo de processamento que permita relatórios eficientes em relação aos dados selecionados. Esta é a promessa dos Data Lakehouses – as capacidades dos Data Warehouses para dados estruturados e as capacidades dos Data Lakehouses para dados não estruturados – tudo numa solução centralizada.
Vejamos nosso cenário de big data com mais detalhes.
O diagrama abaixo descreve um problema comum e uma solução hipotética. Os dados chegam a um data center de vários locais e em vários formatos. O que é necessário é uma solução centralizada que permita que os dados brutos sejam transformados de modo que um mecanismo de processamento possa apoiar com eficiência a inteligência de negócios, a análise de dados e o aprendizado de máquina. Ao mesmo tempo, esta solução também deve ser capaz de armazenar dados não estruturados (texto, imagens, áudio e vídeo) para exploração de dados e aprendizagem automática. Deve também manter quaisquer dados que tenham sido transformados no seu formato original, caso uma transformação precise de ser reproduzida ou um problema de integridade de dados precise de ser investigado.
Como exemplo concreto, imagine um banco de custódia global que gere fundos mútuos para os seus clientes. Os dados que representam o livro de registros contábeis e o livro de registros de investimento de cada fundo de cada cliente são transmitidos constantemente para o Data Lakehouse de regiões geográficas de todo o mundo. A partir daí, as verificações de passagem segura precisam ocorrer (tudo o que foi enviado foi recebido) e as verificações de qualidade dos dados precisam ser executadas. Finalmente, os dados podem ser particionados e carregados em outro armazenamento que suportará relatórios de início e fim do dia.
Como alternativa, talvez este diagrama represente um cenário IOT em que estações meteorológicas enviam dados de temperatura e outros dados relacionados ao clima. Independentemente do cenário, o que é necessário é uma forma de armazenar os dados com segurança no seu formato original e depois transformar e processar quaisquer dados que precisem de ser armazenados de uma forma mais estruturada - tudo numa solução centralizada. Esta é a promessa de um Data Lakehouse – o melhor de um Data Warehouse e de um Data Lake combinados em uma solução centralizada.
Vamos tornar real a solução hipotética descrita acima. Isso está representado no diagrama abaixo.
Existem dois componentes lógicos em nosso Data Lakehouse. A primeira é uma implementação do Apache Iceberg para dados estruturados – o equivalente a um Data Warehouse. (Isso é o que desenvolvi em minha postagem anterior - portanto, não entrarei em detalhes aqui.) O segundo componente lógico é o MinIO para dados não estruturados - o lado do Data Lake do nosso Data Lakehouse. Todos os dados que chegam ao Lakehouse são entregues a esta instância lógica do MinIO. Na realidade, as duas instâncias lógicas do MinIO mostradas acima poderiam ser a mesma instância do MinIO no seu data center. Se o cluster em que você está executando o MinIO puder lidar com a ingestão de todos os dados recebidos e os requisitos de processamento do Apache Iceberg, essa implantação economizará dinheiro. Na verdade, é isso que farei neste post. Usarei um balde na instância do MinIO do Apache Iceberg para armazenar todos os dados brutos e não estruturados.
Vamos começar a brincar com os dados apresentando o conjunto de dados que usarei neste exercício e ingerindo-o no MinIO.
O conjunto de dados que experimentaremos nesta postagem é um conjunto de dados público conhecido como Resumo do Dia da Superfície Global (GSOD), que é gerenciado pela Administração Oceânica e Atmosférica Nacional (NOAA). A NOAA mantém atualmente dados de mais de 9.000 estações em todo o mundo e o conjunto de dados GSOD contém informações resumidas por dia dessas estações. Você pode baixar os dados aqui . Existe um arquivo gzip por ano. Começa em 1929 e termina em 2022 (no momento da redação deste artigo). Para construir nosso Data Lakehouse, baixei o arquivo de cada ano e coloquei-o na instância MinIO que está sendo usada em nosso Data Lakehouse. Coloquei todos os arquivos em um bucket chamado `lake.` Os dois buckets em nossa instância do MinIO são mostrados abaixo. O bucket `warehouse` foi criado quando instalamos o Apache Iceberg.
Usei o console MinIO para ingerir os dados brutos manualmente. Em um pipeline profissional, você desejará fazer isso de forma automatizada. Confira Como configurar o Kafka e transmitir dados para o MinIO no Kubernetes para ver como usar o Kafka e o Kubernetes para obter dados no MinIO.
Esses arquivos são empacotados para conveniência de download - se você tentar usá-los diretamente para criar um relatório ou gráfico, isso seria uma operação com uso intensivo de IO (e potencialmente uso intensivo da CPU). Imagine que você deseja traçar a temperatura média anual de uma estação específica. Para fazer isso, você deve abrir todos os arquivos e pesquisar em todas as linhas, procurando as entradas que correspondem à sua emissora no dia de interesse. Uma opção melhor é usar nossos recursos de Data Lakehouses para selecionar os dados e gerar relatórios sobre os dados selecionados. A primeira etapa é configurar um novo notebook Jupyter.
Primeiro, navegue até o servidor Jupyter Notebook instalado no mecanismo de processamento Apache Spark. Ele pode ser encontrado em http://localhost:8888 . Crie um novo notebook e na primeira célula adicione as importações mostradas abaixo. (Todos os cadernos preenchidos criados nesta postagem podem ser encontrados aqui .)
from collections import namedtuple import csv import json import logging import tarfile from time import time from typing import List from minio import Minio from minio.error import S3Error import pandas as pd import pyarrow as pa import pyarrow.parquet as pq pd.options.mode.chained_assignment = None bucket_name = 'lake'
Observe que estamos importando a biblioteca MinIO. O notebook que estamos construindo é um pipeline ETL de armazenamento não estruturado (MinIO Data Lake) para armazenamento estruturado (Apache Iceberg, que usa MinIO nos bastidores).
Agora, podemos criar um banco de dados e uma tabela Iceberg para nossos dados.
Criar o banco de dados e a tabela para o conjunto de dados GSOD é simples. O script abaixo criará o banco de dados que chamaremos de `noaa`. Adicione isso em uma célula após as importações.
%%sql CREATE DATABASE IF NOT EXISTS noaa;
O script abaixo criará a tabela gsod
.
%%sql CREATE TABLE IF NOT EXISTS noaa.gsod ( station string, date timestamp, latitude double, longitude double, name string, temp double ) USING iceberg PARTITIONED BY (station)
Ao brincar com o Apache Iceberg, muitas vezes você vai querer largar uma mesa para poder recomeçar um experimento. O script abaixo eliminará a tabela gsod
caso você deseje alterar alguma coisa em sua configuração.
%%sql DROP TABLE IF EXISTS noaa.gsod;
Agora que temos os arquivos zip brutos com base no ano em nosso Lakehouse, podemos extraí-los, transformá-los e carregá-los em nosso Data Lakehouse. Vamos apresentar algumas funções auxiliares primeiro. A função abaixo retornará uma lista de objetos MinIO em um bucket especificado que corresponde a um prefixo.
def get_object_list(bucket_name: str, prefix: str) -> List[str]: ''' Gets a list of objects from a bucket. ''' logger = logging.getLogger('gsod_logger') logger.setLevel(logging.INFO) # Load the credentials and connection information. with open('credentials.json') as f: credentials = json.load(f) # Get data of an object. try: # Create client with access and secret key client = Minio(credentials['url'], # host.docker.internal credentials['accessKey'], credentials['secretKey'], secure=False) object_list = [] objects = client.list_objects(bucket_name, prefix=prefix, recursive=True) for obj in objects: object_list.append(obj.object_name) except S3Error as s3_err: logger.error(f'S3 Error occurred: {s3_err}.') raise s3_err except Exception as err: logger.error(f'Error occurred: {err}.') raise err return object_list
Observe que no código acima, é necessário um arquivo de credenciais MinIO. Isso pode ser obtido no console MinIO. Se você não sabe como obter credenciais MinIO, há uma seção nesta postagem que mostra como gerá-las e baixá-las.
A seguir, precisamos de uma função para obter um objeto do MinIO. Como os objetos são arquivos tar, também precisamos desta função para extrair dados do arquivo tar e transformá-los em um DataFrame do Pandas. Isso é feito usando a função abaixo.
def tar_to_df(bucket_name: str, object_name: str) -> pd.DataFrame: ''' This function will take a tarfile reference in MinIO and do the following: - unzip the tarfile - turn the data into a single DataFrame object ''' logger = logging.getLogger('gsod_logger') logger.setLevel(logging.INFO) # Temp file to use for processing the tar files. temp_file_name = 'temp.tar.gz' # Load the credentials and connection information. with open('credentials.json') as f: credentials = json.load(f) # Get data of an object. try: # Create client with access and secret key client = Minio(credentials['url'], # host.docker.internal credentials['accessKey'], credentials['secretKey'], secure=False) object_info = client.fget_object(bucket_name, object_name, temp_file_name) Row = namedtuple('Row', ('station', 'date', 'latitude', 'longitude', 'elevation', 'name', 'temp', 'temp_attributes', 'dewp', 'dewp_attributes', 'slp', 'SLP_attributes', 'stp', 'stp_attributes', 'visib', 'visib_attributes', 'wdsp', 'wdsp_attributes', 'mxspd', 'gust', 'max', 'max_attributes', 'min', 'min_attributes', 'prcp', 'prcp_attributes', 'sndp', 'frshtt')) # Columns of interest and their data types. dtypes={ 'station': 'string', 'date': 'datetime64[ns]', 'latitude': 'float64', 'longitude': 'float64', 'name': 'string', 'temp': 'float64' } tar = tarfile.open(temp_file_name, 'r:gz') all_rows = [] for member in tar.getmembers(): member_handle = tar.extractfile(member) byte_data = member_handle.read() decoded_string = byte_data.decode() lines = decoded_string.splitlines() reader = csv.reader(lines, delimiter=',') # Get all the rows in the member. Skip the header. _ = next(reader) file_rows = [Row(*l) for l in reader] all_rows += file_rows df = pd.DataFrame.from_records(all_rows, columns=Row._fields) df = df[list(dtypes.keys())] for c in df.columns: if dtypes[c] == 'float64': df[c] = pd.to_numeric(df[c], errors='coerce') df = df.astype(dtype=dtypes) except S3Error as s3_err: logger.error(f'S3 Error occurred: {s3_err}.') raise s3_err except Exception as err: logger.error(f'Error occurred: {err}.') raise err return df
Ambas as funções são utilitários genéricos que podem ser reutilizados independentemente do que você estiver fazendo com o MinIO. Considere colocá-los em sua coleção pessoal de trechos de código ou no Github Gist da sua organização.
Agora, estamos prontos para enviar dados para o armazém de nossa Lakehouse. Isso pode ser feito com o código abaixo, que inicia uma sessão Spark, percorre todos os arquivos tar GSOD, extrai, transforma e envia para nossa tabela Iceberg.
from pyspark.sql import SparkSession spark = SparkSession.builder.appName('Jupyter').getOrCreate() objects = get_object_list(bucket_name, 'noaa/gsod') for obj in reversed(objects): print(obj) df = tar_to_df(bucket_name, obj) table = pa.Table.from_pandas(df) pq.write_table(table, 'temp.parquet') df = spark.read.parquet('temp.parquet') df.write.mode('append').saveAsTable('noaa.gsod')
O código nesta seção carregou manualmente os dados de um bucket MinIO. Em um ambiente de produção, você desejará implantar esse código em um serviço e usar MinIO Bucket Events para ingestão automatizada.
Vamos começar um novo notebook para relatórios. A célula abaixo importa os utilitários que precisaremos. Especificamente, usaremos PyIceberg para recuperação de dados, Pandas para organização de dados e Seaborn para visualização de dados.
from pyiceberg.catalog import load_catalog from pyiceberg.expressions import GreaterThanOrEqual, EqualTo import pandas as pd import seaborn as sns pd.options.mode.chained_assignment = None catalog = load_catalog('default')
O que queremos fazer é calcular a temperatura média anual para uma determinada estação meteorológica. Isso nos dá um número por ano e leva em consideração todas as estações do ano. O primeiro passo é consultar o Iceberg para obter todos os dados de uma determinada estação. Isso é feito abaixo usando PyIceberg.
tbl = catalog.load_table('noaa.gsod') sc = tbl.scan(row_filter="station == '72502014734'") df = sc.to_arrow().to_pandas() df.head(10)
O ID da estação usado no código acima é para uma estação localizada no Aeroporto Internacional Newark Liberty, NJ, EUA. Está operacional desde 1973 (quase 50 anos de dados). Quando o código for executado, você obterá a saída abaixo. (Estou usando a função DataFrame head() para obter uma amostra.)
A seguir, precisamos agrupar por ano e calcular a média. Usando Pandas, são algumas linhas de código. Nenhum loop é necessário.
df['year'] = df['date'].dt.year df = df[['year','temp']] grouped_by_year = df.groupby('year') average_by_year = grouped_by_year.mean() average_by_year
Assim que esta célula for executada, você verá um único valor para cada ano. Os principais anos são mostrados abaixo.
Finalmente, podemos visualizar nossas médias anuais. Usaremos Seaborn para criar um gráfico de linha. Isso leva apenas uma linha de código.
sns.lineplot(data=df, x="year", y="temp", errorbar=None)
O gráfico de linha é mostrado abaixo.
Outro comando que você deve sempre executar após executar um relatório pela primeira vez está abaixo.
[task.file.file_path for task in sc.plan_files()]
Esta é uma compreensão de lista que fornecerá uma lista de todos os arquivos de dados no Apache Iceberg que possuem dados que correspondem à sua consulta. Haverá muitos, embora os metadados do Iceberg possam filtrar muitos. Ver todos os arquivos envolvidos deixa claro que o armazenamento de objetos em alta velocidade é uma parte importante de uma Lakehouse.
Neste post, construímos um Data Lakehouse usando MinIO e Apache Iceberg. Fizemos isso usando o conjunto de dados GSOD. Primeiro, os dados brutos foram carregados no lado Lake do nosso Data Lakehouse (MinIO). A partir daí, criamos um banco de dados e uma tabela no Apache Iceberg (o lado Data Warehouse do nosso Data Lakehouse). Em seguida, construímos um pipeline ETL simples para mover dados do Lake para o Warehouse dentro do Data Lakehouse.
Depois que o Apache Iceberg foi totalmente preenchido com dados, pudemos criar um relatório de temperatura média anual e visualizá-lo.
Tenha em mente que se quiser construir um Data Lakehouse em produção, você precisará dos recursos empresariais do MinIO. Considere examinar o gerenciamento do ciclo de vida de objetos , práticas recomendadas de segurança , streaming de Kafka e eventos de bucket .
Também publicado aqui .