paint-brush
Como estou construindo um serviço de IA para análisepor@pro1code1hack
615 leituras
615 leituras

Como estou construindo um serviço de IA para análise

por Yehor Dremliuha12m2024/05/23
Read on Terminal Reader

Muito longo; Para ler

Neste artigo quero compartilhar minha experiência no desenvolvimento de um serviço de IA para uma plataforma de análise web, chamada Swetrix. Meu objetivo era desenvolver um modelo de aprendizado de máquina que pudesse prever o tráfego futuro do site com base nos dados exibidos na captura de tela a seguir. O objetivo final é ter uma visão clara para o cliente sobre qual tráfego aparecerá em seu site no futuro.
featured image - Como estou construindo um serviço de IA para análise
Yehor Dremliuha HackerNoon profile picture
0-item
1-item

Neste artigo quero compartilhar minha experiência no desenvolvimento de um serviço de IA para uma plataforma de análise web, chamada Swetrix.


Meu objetivo era desenvolver um modelo de aprendizado de máquina que pudesse prever o tráfego futuro do site com base nos dados exibidos na captura de tela a seguir

Figura 1 – O projeto

O objetivo final é ter uma visão clara para o cliente de qual tráfego aparecerá em seu site no futuro, permitindo-lhe obter melhores insights e aprimorar o planejamento de negócios em geral.

2. Requisitos e Arquitetura

Durante o planejamento foi tomada a decisão de prosseguir com a Arquitetura de Microsserviços com o corretor de mensagens RabbitMQ para comunicação entre serviços de IA e API.


Figura 2 – Arquitetura


Primeiro de tudo, precisamos coletar dados com uma tarefa cron de hora em hora em um banco de dados separado. Decidimos escolher um ClickHouse, pois nele são armazenados os dados originais dos sites do Swetrix. Detalhes sobre o formato serão abordados nas próximas seções.


RabbitMQ foi escolhido como corretor de mensagens devido à sua simplicidade e precisamos estabelecer uma comunicação entre serviços de IA e API. Vamos analisar tudo e verificar a lógica principal

Serviço Swetrix-API:

  • Reúne estatísticas de dados de hora em hora por meio do Cron Task e envia dados brutos para o serviço de IA.
  • Insere e recebe dados pré-processados do ClickHouse.

Serviço Swetrix-AI:

  • Processa os dados brutos e as preferências selecionadas (intervalo e subcategoria) para previsão.
  • Converte os dados de previsão em formato JSON e os envia de volta ao serviço API via RabbitMQ.


O serviço Swetrix-AI usará a estrutura NestJs para o backend e scripts Python para pré-processamento de dados e previsões de modelo.

3. Pré-processamento

Reunimos os seguintes dados sobre projetos em uma tabela analytics . Figura 3 – Dados brutos no banco de dados Você já viu a versão renderizada desses dados na primeira seção do artigo.

Consegui alcançar esse resultado (quase aceitável) com a seguinte consulta:

 @Cron(CronExpression.EVERY_HOUR) async insertHourlyProjectData(): Promise<void> { const gatherProjectsData = ` INSERT INTO analytics.hourly_projects_data (UniqueID, projectID, statisticsGathered, br_keys, br_vals, os_keys, os_vals, lc_keys, lc_vals, ref_keys, ref_vals, so_keys, so_vals, me_keys, me_vals, ca_keys, ca_vals, cc_keys, cc_vals, dv_keys, dv_vals, rg_keys, rg_vals, ct_keys, ct_vals) SELECT generateUUIDv4() as UniqueID, pid as projectID, toStartOfHour(now()) as statisticsGathered, groupArray(br) as br_keys, groupArray(br_count) as br_vals, groupArray(os) as os_keys, groupArray(os_count) as os_vals, ... groupArray(ct) as ct_keys, groupArray(ct_count) as ct_vals FROM ( SELECT pid, br, count(*) as br_count, os, count(*) as os_count, ... ct, count(*) as ct_count FROM analytics.analytics GROUP BY pid, br, os, lc, ref, so, me, ca, cc, dv, rg, ct ) GROUP BY pid; ` try { await clickhouse.query(gatherProjectsData).toPromise() } catch (e) { console.error( `[CRON WORKER] Error whilst gathering hourly data for all projects: ${e}`, )

A função está programada para ser executada a cada hora usando um Cron Job. Ele coleta e insere dados analíticos em uma clickhouse analytics.hourly_projects_data .

Saída

Figura 4 – Dados processados
Devido às limitações do ClickHouse não consegui obter o formato desejado dos dados. Por isso decidi usar pandas para completar o pré-processamento, necessário para o treinamento do modelo.


Especificamente, usei Python para fazer o seguinte:

3.1 Combinar chaves e valores

Combine chaves e valores relacionados a uma categoria em um campo JSON, por exemplo, combinando chaves e valores de dispositivos em um objeto como tal.

 os_keys = {“Windows”, ”MacOS”, ”MacOS”, ”MacOS”, ”Linux”} os_values = {1, 2, 2, 1, 5}

Em:

 os = {“Windows”: 1, “MacOS”: 5, “Linux”: 5}

Anexando o código e a saída:

 def format_data(keys_list, vals_list, threshold): """ Format data by converting string representations of lists to actual lists, then sums up the counts for each key. Keys with counts below a specified threshold are aggregated into 'Other'. """ counts = defaultdict(int) for keys_str, vals_str in zip(keys_list, vals_list): keys = ast.literal_eval(keys_str) vals = ast.literal_eval(vals_str) for key, val in zip(keys, vals): counts[key] += val final_data = defaultdict(int) for value, count in counts.items(): final_data[value] = count return dict(final_data) def process_group(group): """ Combine specific groups by a group clause, and make a """ result = {} for col in group.columns: if col.endswith('_keys'): prefix = col.split('_')[0] # Extract prefix to identify the category (eg, 'br' for browsers) threshold = other_thresholds.get(prefix, 1) # Get the threshold for this category, default to 1 vals_col = col.replace('_keys', '_vals') keys_list = group[col].tolist() vals_list = group[vals_col].tolist() result[col.replace('_keys', '')] = format_data(keys_list, vals_list, threshold) return pd.Series(result)


Este formato de dados não será usado para a previsão em si, eu diria, é mais para armazená-los no banco de dados e para fins de depuração, para verificar se não há valores faltantes e, além disso, para verificar se o modelo produz um resultado preciso. resultado.

Saída
Figura 5 – Representação de Pandas de Dados Armazenados 3.2 Combinar chaves e valores

Para treinar um modelo adequado decidi definir outros grupos para diversas categorias. O que significa que se globalmente o número de instâncias de um grupo em uma categoria específica estiver abaixo de um determinado percentual (%), ele será adicionado como parte do outro.


Por exemplo, na categoria os temos:

 {“MacOS”: 300, “Windows”: 400, “Linux”: 23 and “TempleOS”: 10}

Como tanto o Linux quanto o TempleOS, neste caso, são extremamente raros, eles serão combinados em outro grupo , portanto o resultado final será:

 {“MacOS”: 300, “Windows”: 400, “other”: 33}.

E a “raridade” é determinada de forma diferente dependendo da categoria e com base no limite designado para esta categoria.

Pode ser configurável com base nas preferências e dados desejados do cliente

 other_thresholds = { 'br': 0.06, 'os': 0.04, 'cc': 0.02, 'lc': 0.02, 'ref': 0.02, 'so': 0.03, 'me': 0.03, 'ca': 0.03, 'cc': 0.02, 'dv': 0.02, 'rg': 0.01, 'ct': 0.01 }

Foram implementadas 2 funções para conseguir isso

 def get_groups_by_treshholds(df,column_name): """Calculate total values for all columns""" if column_name in EXCLUDED_COLUMNS: return counter = count_dict_values(df[column_name]) total = sum(counter.values()) list1 = [] for key, value in counter.items(): if not (value / total) < other_thresholds[column_name]: list1.append(key) return list1 def create_group_columns(df): column_values = [] for key in other_thresholds.keys(): groups = get_groups_by_treshholds(df, key) if not groups: continue for group in groups: column_values.append(f"{key}_{group}") column_values.append(f"{key}_other") return column_values column_values = create_group_columns(df) column_values

Saída

 ['br_Chrome', 'br_Firefox', 'os_Mac OS', 'os_other', 'cc_UA', 'cc_GB', 'cc_other', 'dv_mobile', 'dv_desktop', 'dv_other']

Ao trabalhar com modelos de aprendizado de máquina, é crucial que os dados de entrada estejam em um formato que o modelo possa compreender. Os modelos de aprendizado de máquina normalmente exigem valores numéricos (inteiros, flutuantes) em vez de estruturas de dados complexas como JSON.


Portanto, novamente, é preferível um pouco mais de pré-processamento de nossos dados para atender a esse requisito.


Eu criei uma função create_exploded_df onde cada recurso é representado como uma coluna separada e as linhas contêm os valores numéricos correspondentes. (Ainda não é o ideal, mas foi a melhor solução que consegui produzir)


 def create_exploded_df(df): """ Function which creates a new data set, iterates through the old one and fill in values according to their belongings (br_other, etc..) """ new_df = df[['projectID', 'statisticsGathered']] for group in column_values: new_df[group] = 0 new_df_cols = new_df.columns df_cols = df.columns for column in df_cols: if column in ['projectID', 'statisticsGathered']: continue for index, row in enumerate(df[column]): if column in EXCLUDED_COLUMNS: continue for key, value in row.items(): total = 0 if (a:=f"{column}_{key}") in new_df_cols: new_df[a][index] = value else: total += value new_df[f"{column}_other"][index] = total return new_df new_df = create_exploded_df(df) new_df.to_csv("2-weeks-exploded.csv") new_df

Saída

Figura 6 – Características do Modelo 3.3 Preencha horas

Outro problema com o formato dos dados que tivemos é que se não houvesse tráfego para um projeto em uma hora específica, em vez de criar uma linha em branco, não haveria nenhuma linha, o que é inconveniente considerando o fato de que o modelo foi projetado para prever dados para o próximo período de tempo (por exemplo, a próxima hora). No entanto, não é viável treinar o modelo para fazer previsões se não houver dados disponíveis para o período inicial.


Portanto, escrevi um script que encontraria horas perdidas e inseriria linhas em branco quando uma hora fosse ignorada

Figura 7 – Horas Preenchidas

3.4 Adicionar e mudar colunas de destino

Em relação ao treinamento do modelo, a abordagem principal foi usar os dados da hora anterior como alvo do modelo. Isso permite que o modelo preveja o tráfego futuro com base nos dados atuais.

 def sort_df_and_assign_targets(df): df = df.copy() df = df.sort_values(by=['projectID', 'statisticsGathered']) for column_name in df.columns: if not column_name.endswith('target'): continue df[column_name] = df.groupby('projectID')[column_name].shift(-1) return df new_df = sort_df_and_assign_targets(new_df)

Saída

Figure 8 - Model Predictions









3.5 Dividir statisticsGathered reunidas em colunas separadas

A principal razão para tal abordagem é que statisticsGathered era um objeto datetime , cujos modelos que tentei usar (verifique as seções subsequentes) não são capazes de processá-lo e identificar o padrão correto.


Isso resultou em métricas MSE/MRSE terríveis. Portanto, durante o desenvolvimento, foi tomada a decisão de separar os recursos para day , month e hour , o que melhorou significativamente os resultados.

 def split_statistic_gathered(df): df['Month'] = df['statisticsGathered'].dt.month.astype(int) # as int df['Day'] = df['statisticsGathered'].dt.day.astype(int) # as int df['Hour'] = df['statisticsGathered'].dt.hour df = df.drop('statisticsGathered', axis = 1) return df new_df = split_statistic_gathered(new_df) new_df

Saída
Figure 9 - Converted statisticsGathered


E é isso! Vamos pular para o treinamento em si! 🎉🎉🎉






4. Regressão Linear

Bem, eu acho que a previsão real foi a parte mais desafiadora durante a construção deste aplicativo.

A primeira coisa que eu queria tentar é usar o modelo LinearRegression :


Implementei as seguintes funções:

 def create_model_for_target(train_df, target_series):    X_train, x_test, Y_train, y_test = train_test_split(train_df, target_series, test_size=0.3, shuffle=False)    reg = LinearRegression()    reg.fit(X_train, Y_train)    y_pred = reg.predict(x_test)    return {"y_test": y_test, "y_pred": y_pred} def create_models_for_targets(df):    models_data = dict()    df = df.dropna()    train_df = clear_df(df)    for target_name in df[[column_name for column_name in df.columns if column_name.endswith("target")]]:        models_data[target_name] = create_model_for_target(train_df, df[target_name])    return models_data


Explicação

Para cada coluna de destino, dividimos os dados em conjuntos de treinamento e teste. Em seguida, treinamos um modelo LinearRegression nos dados de treinamento e fazemos previsões nos dados de teste.

Para avaliar se os resultados estão corretos, adicionei a função que reúne as métricas necessárias e produz a saída

 def evaluate_models(data):    evaluation = []    for target, results in data.items():        y_test, y_pred = results['y_test'], results['y_pred']        mse = mean_squared_error(y_test, y_pred)        rmse = mean_squared_error(y_test, y_pred) ** 0.5        mae = mean_absolute_error(y_test, y_pred)        mean_y = y_test.mean()        median_y = y_test.median()        evaluation.append({'target': target, 'mse': mse, 'rmse': rmse, 'mae': mae, 'mean_y': mean_y, 'median_y': median_y})    return pd.DataFrame(evaluation)

Saída

Eu escrevi um script que gerou a saída e a salvei em um arquivo Excel, contabilizando os valores mse , rmse , mae e mean_y

Figura 10 – Resultados Iniciais (Sem Total)


Como você pode ver, as métricas não são satisfatórias e os dados de tráfego previstos estarão longe de ser precisos e não serão adequados para meus objetivos de previsões de tráfego.

Portanto, tomei a decisão de prever totais de visitantes por hora, de modo que foram criadas as seguintes funções


 def add_target_column(df, by):  totals_series = df.apply(lambda x: sum(x[[column for column in df.columns if column.startswith(by)]]), axis=1)  df['total'] = totals_series  df[f'total_{by}_target'] = totals_series  return df def shift_target_column(df, by):  df = df.sort_values(by=['projectID', 'statisticsGathered'], ignore_index=True)  df['total_target'] = df.groupby('projectID')[f'total_{by}_target'].shift(-1)  return df new_df = add_target_column(new_df, 'br') new_df = shift_target_column(new_df, 'br') new_df[['total_br_target']]


Saída

Figure 11 - Total Target Esta função pega uma categoria específica e calcula o total de visitantes com base nela. Isso funciona porque o número total de valores de dispositivo seria igual ao número total de valores de sistema operacional.


Com essa abordagem, o modelo apresentou resultados 10 vezes melhores do que antes .



5. Conclusão

Se estamos falando sobre este caso, é um recurso quase aceitável e pronto para usar. Os clientes agora podem planejar a alocação de orçamento e o dimensionamento do servidor dependendo do resultado dessas previsões

Figure 12 -Total Results As previsões divergem dos valores reais em cerca de 2,45 visitantes (uma vez que RMSE = √MSE ) . O que não pode ter nenhum impacto negativo crucial para as necessidades de marketing.


Como este artigo se tornou bastante extenso e o aplicativo continua em desenvolvimento, faremos uma pausa aqui. Continuaremos a refinar essa abordagem no futuro e manterei vocês atualizados!


Obrigado pela leitura e pela sua atenção! Estou ansioso para ouvir seus comentários e opiniões na seção de comentários. Espero que esta informação seja útil para os seus objetivos!


E boa sorte!