En este artículo quiero compartir mi experiencia en el desarrollo de un servicio de inteligencia artificial para una plataforma de análisis web, llamado Swetrix.
Mi objetivo era desarrollar un modelo de aprendizaje automático que pudiera predecir el tráfico futuro del sitio web en función de los datos que se muestran en la siguiente captura de pantalla.
El objetivo final es tener una visión clara para el cliente del tráfico que aparecerá en su sitio web en el futuro, permitiéndole así obtener mejores conocimientos y mejorar la planificación empresarial en general.
Durante la planificación, se tomó la decisión de continuar con la arquitectura de microservicio con el intermediario de mensajes RabbitMQ para la comunicación entre los servicios de IA y API.
En primer lugar, necesitamos recopilar datos con una tarea cron cada hora en una base de datos separada. Decidimos elegir ClickHouse, ya que en él se almacenan los datos originales de los sitios web de Swetrix. Los detalles sobre el formato se cubrirán en las siguientes secciones.
Se eligió RabbitMQ como intermediario de mensajes debido a su simplicidad y a la necesidad de establecer una comunicación entre los servicios de IA y API. Analicemos todo y verifiquemos la lógica principal.
El servicio Swetrix-AI utilizará el marco NestJs para el backend y scripts de Python para el preprocesamiento de datos y las predicciones de modelos.
Reunimos los siguientes datos sobre proyectos en una tabla analytics
. Ya has visto la versión renderizada de estos datos en la primera sección del artículo.
Pude lograr este resultado (casi aceptable) con la siguiente consulta:
@Cron(CronExpression.EVERY_HOUR) async insertHourlyProjectData(): Promise<void> { const gatherProjectsData = ` INSERT INTO analytics.hourly_projects_data (UniqueID, projectID, statisticsGathered, br_keys, br_vals, os_keys, os_vals, lc_keys, lc_vals, ref_keys, ref_vals, so_keys, so_vals, me_keys, me_vals, ca_keys, ca_vals, cc_keys, cc_vals, dv_keys, dv_vals, rg_keys, rg_vals, ct_keys, ct_vals) SELECT generateUUIDv4() as UniqueID, pid as projectID, toStartOfHour(now()) as statisticsGathered, groupArray(br) as br_keys, groupArray(br_count) as br_vals, groupArray(os) as os_keys, groupArray(os_count) as os_vals, ... groupArray(ct) as ct_keys, groupArray(ct_count) as ct_vals FROM ( SELECT pid, br, count(*) as br_count, os, count(*) as os_count, ... ct, count(*) as ct_count FROM analytics.analytics GROUP BY pid, br, os, lc, ref, so, me, ca, cc, dv, rg, ct ) GROUP BY pid; ` try { await clickhouse.query(gatherProjectsData).toPromise() } catch (e) { console.error( `[CRON WORKER] Error whilst gathering hourly data for all projects: ${e}`, )
La función está programada para ejecutarse cada hora mediante un trabajo cron. Recopila e inserta datos analíticos en un clickhouse analytics.hourly_projects_data
.
Producción
Debido a las limitaciones de ClickHouse, no pude lograr el formato deseado de los datos. Por lo tanto, decidí utilizar pandas
para completar el preprocesamiento necesario para el entrenamiento del modelo.
Específicamente usé Python para hacer lo siguiente:
Combine claves y valores relacionados con una categoría en un campo JSON, por ejemplo combinando claves y valores de dispositivos en un objeto como tal.
os_keys = {“Windows”, ”MacOS”, ”MacOS”, ”MacOS”, ”Linux”} os_values = {1, 2, 2, 1, 5}
En:
os = {“Windows”: 1, “MacOS”: 5, “Linux”: 5}
Adjuntando el código y la salida:
def format_data(keys_list, vals_list, threshold): """ Format data by converting string representations of lists to actual lists, then sums up the counts for each key. Keys with counts below a specified threshold are aggregated into 'Other'. """ counts = defaultdict(int) for keys_str, vals_str in zip(keys_list, vals_list): keys = ast.literal_eval(keys_str) vals = ast.literal_eval(vals_str) for key, val in zip(keys, vals): counts[key] += val final_data = defaultdict(int) for value, count in counts.items(): final_data[value] = count return dict(final_data) def process_group(group): """ Combine specific groups by a group clause, and make a """ result = {} for col in group.columns: if col.endswith('_keys'): prefix = col.split('_')[0] # Extract prefix to identify the category (eg, 'br' for browsers) threshold = other_thresholds.get(prefix, 1) # Get the threshold for this category, default to 1 vals_col = col.replace('_keys', '_vals') keys_list = group[col].tolist() vals_list = group[vals_col].tolist() result[col.replace('_keys', '')] = format_data(keys_list, vals_list, threshold) return pd.Series(result)
Este formato de datos no se usará para la predicción en sí, yo diría, es más para almacenarlos en la base de datos y con fines de depuración para verificar que no haya valores faltantes y, además, para verificar que el modelo produzca una predicción precisa. resultado.
Para entrenar un modelo adecuado decidí definir otros grupos para varias categorías. Lo que significa que si globalmente el número de instancias de un grupo en una categoría específica está por debajo de cierto porcentaje (%), se agregará como parte de la otra.
Por ejemplo, en la categoría os
tenemos:
{“MacOS”: 300, “Windows”: 400, “Linux”: 23 and “TempleOS”: 10}
Dado que tanto Linux como TempleOS en este caso son extremadamente raros, se combinarán en otro grupo , por lo que el resultado final será:
{“MacOS”: 300, “Windows”: 400, “other”: 33}.
Y la "rareza" se determina de forma diferente según la categoría y en función del umbral designado para esta categoría.
Puede ser configurable en función de las preferencias y datos deseados por el cliente.
other_thresholds = { 'br': 0.06, 'os': 0.04, 'cc': 0.02, 'lc': 0.02, 'ref': 0.02, 'so': 0.03, 'me': 0.03, 'ca': 0.03, 'cc': 0.02, 'dv': 0.02, 'rg': 0.01, 'ct': 0.01 }
Se implementaron 2 funciones para lograr esto.
def get_groups_by_treshholds(df,column_name): """Calculate total values for all columns""" if column_name in EXCLUDED_COLUMNS: return counter = count_dict_values(df[column_name]) total = sum(counter.values()) list1 = [] for key, value in counter.items(): if not (value / total) < other_thresholds[column_name]: list1.append(key) return list1 def create_group_columns(df): column_values = [] for key in other_thresholds.keys(): groups = get_groups_by_treshholds(df, key) if not groups: continue for group in groups: column_values.append(f"{key}_{group}") column_values.append(f"{key}_other") return column_values column_values = create_group_columns(df) column_values
Producción
['br_Chrome', 'br_Firefox', 'os_Mac OS', 'os_other', 'cc_UA', 'cc_GB', 'cc_other', 'dv_mobile', 'dv_desktop', 'dv_other']
Cuando se trabaja con modelos de aprendizaje automático, es fundamental que los datos de entrada estén en un formato que el modelo pueda comprender. Los modelos de aprendizaje automático normalmente requieren valores numéricos (enteros, flotantes) en lugar de estructuras de datos complejas como JSON.
Por lo tanto, nuevamente, es preferible realizar un poco más de preprocesamiento de nuestros datos para cumplir con este requisito.
He creado una función create_exploded_df
donde cada característica se representa como una columna separada y las filas contienen los valores numéricos correspondientes. (Aún no es ideal, pero fue la mejor solución que pude producir)
def create_exploded_df(df): """ Function which creates a new data set, iterates through the old one and fill in values according to their belongings (br_other, etc..) """ new_df = df[['projectID', 'statisticsGathered']] for group in column_values: new_df[group] = 0 new_df_cols = new_df.columns df_cols = df.columns for column in df_cols: if column in ['projectID', 'statisticsGathered']: continue for index, row in enumerate(df[column]): if column in EXCLUDED_COLUMNS: continue for key, value in row.items(): total = 0 if (a:=f"{column}_{key}") in new_df_cols: new_df[a][index] = value else: total += value new_df[f"{column}_other"][index] = total return new_df new_df = create_exploded_df(df) new_df.to_csv("2-weeks-exploded.csv") new_df
3.3 Rellenar horas
Otro problema con el formato de datos que tuvimos es que si no hubiera tráfico para un proyecto en una hora específica en lugar de crear una fila en blanco, no habría ninguna fila, lo cual es inconveniente considerando el hecho de que el modelo está diseñado para predecir datos para el próximo período de tiempo (por ejemplo, la próxima hora). Sin embargo, no es factible entrenar el modelo para que haga predicciones si no hay datos disponibles para el período de tiempo inicial.
Por lo tanto, escribí un script que encontraría las horas faltantes e insertaría filas en blanco cuando se omitiera una hora.
Con respecto al entrenamiento del modelo, el enfoque principal fue utilizar datos de la hora anterior como objetivo del modelo. Esto permite que el modelo prediga el tráfico futuro basándose en los datos actuales.
def sort_df_and_assign_targets(df): df = df.copy() df = df.sort_values(by=['projectID', 'statisticsGathered']) for column_name in df.columns: if not column_name.endswith('target'): continue df[column_name] = df.groupby('projectID')[column_name].shift(-1) return df new_df = sort_df_and_assign_targets(new_df)
Producción
statisticsGathered
reunidas en columnas separadas La razón principal de este enfoque es que statisticsGathered
era un objeto datetime
, cuyos modelos que intenté usar (consulte las secciones siguientes) no pueden procesarlo ni identificar el patrón correcto.
Eso resultó en métricas terribles MSE/MRSE
. Entonces, durante el desarrollo, se tomó la decisión de separar las funciones por day
, month
y hour
, lo que mejoró significativamente los resultados.
def split_statistic_gathered(df): df['Month'] = df['statisticsGathered'].dt.month.astype(int) # as int df['Day'] = df['statisticsGathered'].dt.day.astype(int) # as int df['Hour'] = df['statisticsGathered'].dt.hour df = df.drop('statisticsGathered', axis = 1) return df new_df = split_statistic_gathered(new_df) new_df
¡Y eso es! ¡Pasemos al entrenamiento en sí! 🎉🎉🎉
Bueno, supongo que la predicción real fue la parte más desafiante durante la creación de esta aplicación.
Lo primero que quería probar es utilizar el modelo LinearRegression
:
Implementé las siguientes funciones:
def create_model_for_target(train_df, target_series): X_train, x_test, Y_train, y_test = train_test_split(train_df, target_series, test_size=0.3, shuffle=False) reg = LinearRegression() reg.fit(X_train, Y_train) y_pred = reg.predict(x_test) return {"y_test": y_test, "y_pred": y_pred} def create_models_for_targets(df): models_data = dict() df = df.dropna() train_df = clear_df(df) for target_name in df[[column_name for column_name in df.columns if column_name.endswith("target")]]: models_data[target_name] = create_model_for_target(train_df, df[target_name]) return models_data
Para cada columna de destino, dividimos los datos en conjuntos de entrenamiento y prueba. Luego entrenamos un modelo LinearRegression
con los datos de entrenamiento y hacemos predicciones sobre los datos de prueba.
Para evaluar que los resultados son correctos, agregué la función que recopila las métricas requeridas y produce el resultado.
def evaluate_models(data): evaluation = [] for target, results in data.items(): y_test, y_pred = results['y_test'], results['y_pred'] mse = mean_squared_error(y_test, y_pred) rmse = mean_squared_error(y_test, y_pred) ** 0.5 mae = mean_absolute_error(y_test, y_pred) mean_y = y_test.mean() median_y = y_test.median() evaluation.append({'target': target, 'mse': mse, 'rmse': rmse, 'mae': mae, 'mean_y': mean_y, 'median_y': median_y}) return pd.DataFrame(evaluation)
Escribí un script que generó el resultado y lo guardé en un archivo de Excel, contabilizando los valores mse
, rmse
, mae
y mean_y
Como puede ver, las métricas no son satisfactorias y los datos de tráfico previstos estarán lejos de ser precisos y no serán adecuados para mis objetivos de pronósticos de tráfico.
Por lo tanto, tomé la decisión de predecir los totales de visitantes por hora, de modo que se crearon las siguientes funciones
def add_target_column(df, by): totals_series = df.apply(lambda x: sum(x[[column for column in df.columns if column.startswith(by)]]), axis=1) df['total'] = totals_series df[f'total_{by}_target'] = totals_series return df def shift_target_column(df, by): df = df.sort_values(by=['projectID', 'statisticsGathered'], ignore_index=True) df['total_target'] = df.groupby('projectID')[f'total_{by}_target'].shift(-1) return df new_df = add_target_column(new_df, 'br') new_df = shift_target_column(new_df, 'br') new_df[['total_br_target']]
Esta función toma una categoría específica y calcula el total de visitantes en función de ella. Esto funciona porque la cantidad total de valores del dispositivo sería la misma que la cantidad total de valores del sistema operativo.
Con este enfoque, el modelo mostró resultados 10 veces mejores que antes .
Si hablamos de este caso, es una característica casi aceptable y lista para usar. Los clientes ahora pueden planificar la asignación de su presupuesto y la ampliación del servidor en función del resultado de estas predicciones.
Las predicciones se desvían de los valores reales en aproximadamente 2,45 visitantes (ya que RMSE = √MSE ) . Lo cual no puede tener ningún impacto negativo y crucial para las necesidades de marketing.
Como este artículo se ha vuelto bastante extenso y la aplicación aún está en desarrollo, haremos una pausa aquí. ¡Continuaremos perfeccionando este enfoque en el futuro y los mantendremos informados!
¡Gracias por leer y tu atención! Espero escuchar sus comentarios y opiniones en la sección de comentarios. ¡Espero que esta información te resulte útil para tus objetivos!
¡Y buena suerte!