Dans cet article, je souhaite partager mon expérience dans le développement d'un service d'IA pour une plateforme d'analyse Web, appelée Swetrix.
Mon objectif était de développer un modèle d'apprentissage automatique qui permettrait de prédire le trafic futur d'un site Web sur la base des données affichées sur la capture d'écran suivante.
L'objectif final est d'avoir une vision claire pour le client du trafic qui apparaîtra sur son site Web à l'avenir, lui permettant ainsi d'obtenir de meilleures informations et d'améliorer la planification commerciale en général.
Au cours de la planification, il a été décidé de poursuivre l'architecture de microservices avec le courtier de messages RabbitMQ pour la communication entre les services IA et API.
Tout d'abord, nous devons rassembler les données avec une tâche périodique horaire dans une base de données distincte. Nous avons décidé de choisir un ClickHouse, car les données originales des sites Web sur Swetrix y sont stockées. Les détails sur le format seront abordés dans les prochaines sections.
RabbitMQ a été choisi comme courtier de messages en raison de sa simplicité et nous devons établir une communication entre les services IA et API. Décomposons tout et vérifions la logique principale
Le service Swetrix-AI utilisera le framework NestJs pour le backend et des scripts Python pour le prétraitement des données et les prédictions de modèles.
Nous rassemblons les données suivantes sur les projets dans un tableau analytics
. Vous avez déjà vu la version rendue de ces données dans la première section de l'article.
J'ai pu obtenir ce résultat (presque acceptable) avec la requête suivante :
@Cron(CronExpression.EVERY_HOUR) async insertHourlyProjectData(): Promise<void> { const gatherProjectsData = ` INSERT INTO analytics.hourly_projects_data (UniqueID, projectID, statisticsGathered, br_keys, br_vals, os_keys, os_vals, lc_keys, lc_vals, ref_keys, ref_vals, so_keys, so_vals, me_keys, me_vals, ca_keys, ca_vals, cc_keys, cc_vals, dv_keys, dv_vals, rg_keys, rg_vals, ct_keys, ct_vals) SELECT generateUUIDv4() as UniqueID, pid as projectID, toStartOfHour(now()) as statisticsGathered, groupArray(br) as br_keys, groupArray(br_count) as br_vals, groupArray(os) as os_keys, groupArray(os_count) as os_vals, ... groupArray(ct) as ct_keys, groupArray(ct_count) as ct_vals FROM ( SELECT pid, br, count(*) as br_count, os, count(*) as os_count, ... ct, count(*) as ct_count FROM analytics.analytics GROUP BY pid, br, os, lc, ref, so, me, ca, cc, dv, rg, ct ) GROUP BY pid; ` try { await clickhouse.query(gatherProjectsData).toPromise() } catch (e) { console.error( `[CRON WORKER] Error whilst gathering hourly data for all projects: ${e}`, )
La fonction est programmée pour s'exécuter toutes les heures à l'aide d'une tâche Cron. Il rassemble et insère des données analytiques dans un clickhouse analytics.hourly_projects_data
.
Sortir
En raison des limitations de ClickHouse, je n'ai pas pu obtenir le format souhaité pour les données. J'ai donc décidé d'utiliser pandas
pour compléter le prétraitement, nécessaire à la formation du modèle.
Plus précisément, j'ai utilisé Python pour effectuer les opérations suivantes :
Combinez les clés et les valeurs liées à une catégorie dans un seul champ JSON, par exemple en combinant les clés et les valeurs des appareils en un seul objet en tant que tel.
os_keys = {“Windows”, ”MacOS”, ”MacOS”, ”MacOS”, ”Linux”} os_values = {1, 2, 2, 1, 5}
Dans:
os = {“Windows”: 1, “MacOS”: 5, “Linux”: 5}
Joindre le code et la sortie :
def format_data(keys_list, vals_list, threshold): """ Format data by converting string representations of lists to actual lists, then sums up the counts for each key. Keys with counts below a specified threshold are aggregated into 'Other'. """ counts = defaultdict(int) for keys_str, vals_str in zip(keys_list, vals_list): keys = ast.literal_eval(keys_str) vals = ast.literal_eval(vals_str) for key, val in zip(keys, vals): counts[key] += val final_data = defaultdict(int) for value, count in counts.items(): final_data[value] = count return dict(final_data) def process_group(group): """ Combine specific groups by a group clause, and make a """ result = {} for col in group.columns: if col.endswith('_keys'): prefix = col.split('_')[0] # Extract prefix to identify the category (eg, 'br' for browsers) threshold = other_thresholds.get(prefix, 1) # Get the threshold for this category, default to 1 vals_col = col.replace('_keys', '_vals') keys_list = group[col].tolist() vals_list = group[vals_col].tolist() result[col.replace('_keys', '')] = format_data(keys_list, vals_list, threshold) return pd.Series(result)
Ce format de données ne sera pas utilisé pour la prédiction elle-même, je dirais, mais plutôt pour le stocker dans la base de données et à des fins de débogage afin de vérifier qu'il n'y a pas de valeurs manquantes et, en outre, de vérifier que le modèle produit un résultat précis. résultat.
Pour former un modèle adéquat, j'ai décidé de définir d'autres groupes pour différentes catégories. Ce qui signifie que si globalement le nombre d'instances d'un groupe dans une catégorie spécifique est inférieur à un certain pourcentage (%), il sera ajouté dans l'autre.
Par exemple, dans la catégorie os
nous avons :
{“MacOS”: 300, “Windows”: 400, “Linux”: 23 and “TempleOS”: 10}
Étant donné que Linux et TempleOS sont extrêmement rares dans ce cas, ils seront combinés dans un autre groupe , le résultat final sera donc :
{“MacOS”: 300, “Windows”: 400, “other”: 33}.
Et la « rareté » est déterminée différemment selon la catégorie et en fonction du seuil désigné pour cette catégorie.
Il peut être configurable en fonction des préférences et des données souhaitées pour le client
other_thresholds = { 'br': 0.06, 'os': 0.04, 'cc': 0.02, 'lc': 0.02, 'ref': 0.02, 'so': 0.03, 'me': 0.03, 'ca': 0.03, 'cc': 0.02, 'dv': 0.02, 'rg': 0.01, 'ct': 0.01 }
Deux fonctions ont été implémentées pour y parvenir
def get_groups_by_treshholds(df,column_name): """Calculate total values for all columns""" if column_name in EXCLUDED_COLUMNS: return counter = count_dict_values(df[column_name]) total = sum(counter.values()) list1 = [] for key, value in counter.items(): if not (value / total) < other_thresholds[column_name]: list1.append(key) return list1 def create_group_columns(df): column_values = [] for key in other_thresholds.keys(): groups = get_groups_by_treshholds(df, key) if not groups: continue for group in groups: column_values.append(f"{key}_{group}") column_values.append(f"{key}_other") return column_values column_values = create_group_columns(df) column_values
Sortir
['br_Chrome', 'br_Firefox', 'os_Mac OS', 'os_other', 'cc_UA', 'cc_GB', 'cc_other', 'dv_mobile', 'dv_desktop', 'dv_other']
Lorsque vous travaillez avec des modèles d'apprentissage automatique, il est essentiel que les données d'entrée soient dans un format que le modèle peut comprendre. Les modèles d'apprentissage automatique nécessitent généralement des valeurs numériques (entiers, flottants) plutôt que des structures de données complexes comme JSON.
Par conséquent, encore une fois, il est préférable d’effectuer un peu plus de prétraitement de nos données pour répondre à cette exigence.
J'ai créé une fonction create_exploded_df
où chaque fonctionnalité est représentée sous forme de colonne distincte et les lignes contiennent les valeurs numériques correspondantes. (Ce n'est pas encore idéal, mais c'est la meilleure solution que j'ai pu produire)
def create_exploded_df(df): """ Function which creates a new data set, iterates through the old one and fill in values according to their belongings (br_other, etc..) """ new_df = df[['projectID', 'statisticsGathered']] for group in column_values: new_df[group] = 0 new_df_cols = new_df.columns df_cols = df.columns for column in df_cols: if column in ['projectID', 'statisticsGathered']: continue for index, row in enumerate(df[column]): if column in EXCLUDED_COLUMNS: continue for key, value in row.items(): total = 0 if (a:=f"{column}_{key}") in new_df_cols: new_df[a][index] = value else: total += value new_df[f"{column}_other"][index] = total return new_df new_df = create_exploded_df(df) new_df.to_csv("2-weeks-exploded.csv") new_df
3.3 Remplir les heures
Un autre problème avec le format des données que nous avions est que s'il n'y avait pas de trafic pour un projet au cours d'une heure spécifique au lieu de créer une ligne vide, il n'y aurait aucune ligne du tout, ce qui est gênant compte tenu du fait que le modèle est conçu pour prédire les données pour la période à venir (par exemple, l'heure suivante). Cependant, il n’est pas possible d’entraîner le modèle pour faire des prédictions si aucune donnée n’est disponible pour la période initiale.
C'est pourquoi j'ai écrit un script qui trouverait les heures manquantes et insérerait des lignes vides lorsqu'une heure est sautée
Concernant la formation du modèle, l'approche principale consistait à utiliser les données de l'heure précédente comme cible du modèle. Cela permet au modèle de prédire le trafic futur sur la base des données actuelles.
def sort_df_and_assign_targets(df): df = df.copy() df = df.sort_values(by=['projectID', 'statisticsGathered']) for column_name in df.columns: if not column_name.endswith('target'): continue df[column_name] = df.groupby('projectID')[column_name].shift(-1) return df new_df = sort_df_and_assign_targets(new_df)
Sortir
statisticsGathered
diviséesRassemblées en colonnes séparées La raison principale d'une telle approche est que statisticsGathered
était un objet datetime
, que les modèles que j'ai essayé d'utiliser (consultez les sections suivantes) ne sont pas capables de le traiter et d'identifier le modèle correct.
Cela a abouti à de terribles mesures MSE/MRSE
. Ainsi, au cours du développement, la décision a été prise de séparer les fonctionnalités pour day
, month
et hour
, ce qui a considérablement amélioré les résultats.
def split_statistic_gathered(df): df['Month'] = df['statisticsGathered'].dt.month.astype(int) # as int df['Day'] = df['statisticsGathered'].dt.day.astype(int) # as int df['Hour'] = df['statisticsGathered'].dt.hour df = df.drop('statisticsGathered', axis = 1) return df new_df = split_statistic_gathered(new_df) new_df
Et c'est tout! Passons à la formation elle-même ! 🎉🎉🎉
Eh bien, je suppose que la prédiction réelle a été la partie la plus difficile de la création de cette application.
La première chose que je voulais essayer est d'utiliser le modèle LinearRegression
:
J'ai implémenté les fonctions suivantes :
def create_model_for_target(train_df, target_series): X_train, x_test, Y_train, y_test = train_test_split(train_df, target_series, test_size=0.3, shuffle=False) reg = LinearRegression() reg.fit(X_train, Y_train) y_pred = reg.predict(x_test) return {"y_test": y_test, "y_pred": y_pred} def create_models_for_targets(df): models_data = dict() df = df.dropna() train_df = clear_df(df) for target_name in df[[column_name for column_name in df.columns if column_name.endswith("target")]]: models_data[target_name] = create_model_for_target(train_df, df[target_name]) return models_data
Pour chaque colonne cible, nous divisons les données en ensembles de formation et de test. Nous formons ensuite un modèle LinearRegression
sur les données d'entraînement et effectuons des prédictions sur les données de test.
Afin d'évaluer que les résultats sont corrects, j'ai ajouté la fonction qui rassemble les métriques requises et produit le résultat.
def evaluate_models(data): evaluation = [] for target, results in data.items(): y_test, y_pred = results['y_test'], results['y_pred'] mse = mean_squared_error(y_test, y_pred) rmse = mean_squared_error(y_test, y_pred) ** 0.5 mae = mean_absolute_error(y_test, y_pred) mean_y = y_test.mean() median_y = y_test.median() evaluation.append({'target': target, 'mse': mse, 'rmse': rmse, 'mae': mae, 'mean_y': mean_y, 'median_y': median_y}) return pd.DataFrame(evaluation)
J'ai écrit un script qui a généré la sortie et l'a enregistré dans un fichier Excel, comptabilisant les valeurs mse
, rmse
, mae
et mean_y
.
Comme vous pouvez le constater, les mesures ne sont pas satisfaisantes et les données de trafic prévues seront loin d'être exactes et ne correspondront pas à mes objectifs de prévisions de trafic.
Par conséquent, j'ai pris la décision de prédire le nombre total de visiteurs par heure, afin que les fonctions suivantes soient créées
def add_target_column(df, by): totals_series = df.apply(lambda x: sum(x[[column for column in df.columns if column.startswith(by)]]), axis=1) df['total'] = totals_series df[f'total_{by}_target'] = totals_series return df def shift_target_column(df, by): df = df.sort_values(by=['projectID', 'statisticsGathered'], ignore_index=True) df['total_target'] = df.groupby('projectID')[f'total_{by}_target'].shift(-1) return df new_df = add_target_column(new_df, 'br') new_df = shift_target_column(new_df, 'br') new_df[['total_br_target']]
Cette fonction prend une catégorie spécifique et calcule le nombre total de visiteurs en fonction de celle-ci. Cela fonctionne car le nombre total de valeurs de périphérique serait le même que le nombre total de valeurs de système d'exploitation.
Avec une telle approche, le modèle a montré des résultats 10 fois meilleurs qu'auparavant .
Si nous parlons de ce cas, c'est une fonctionnalité presque acceptable et prête à l'emploi. Les clients peuvent désormais planifier leur allocation budgétaire et la mise à l'échelle de leur serveur en fonction du résultat de ces prévisions.
Les prévisions s'écartent des valeurs réelles d'environ 2,45 visiteurs (puisque RMSE = √MSE ) . Ce qui ne peut avoir aucun impact négatif crucial sur les besoins marketing.
Comme cet article est devenu assez long et que l'application est toujours en cours de développement, nous ferons une pause ici. Nous continuerons d’affiner cette approche à l’avenir et je vous tiendrai au courant !
Merci d'avoir lu et de votre attention! J'ai hâte d'entendre vos commentaires et vos réflexions dans la section commentaires. J'espère que ces informations s'avéreront utiles pour vos objectifs !
Et bonne chance!