この記事では、Swetrix と呼ばれる Web 分析プラットフォーム向けの AI サービスの開発経験を共有したいと思います。
私の目的は、次のスクリーンショットに表示されているデータに基づいて将来のウェブサイトトラフィックを予測する機械学習モデルを開発することでした。
最終目標は、将来的にウェブサイトにどのようなトラフィックが発生するかについて顧客に明確なビジョンを示し、それによって顧客がより優れた洞察を得て、ビジネス計画全体を強化できるようにすることです。
計画中に、AI と API サービス間の通信に RabbitMQ メッセージ ブローカーを使用したマイクロサービス アーキテクチャを採用することが決定されました。
まず、1 時間ごとの cron タスクを使用してデータを別のデータベースに収集する必要があります。Swetrix の Web サイトの元のデータが ClickHouse に保存されているため、ClickHouse を選択することにしました。形式の詳細については、次のセクションで説明します。
RabbitMQはシンプルさからメッセージブローカーとして選ばれ、AIとAPIサービス間の通信を確立する必要があります。すべてを分解して、主なロジックを確認しましょう。
Swetrix-AI サービスでは、バックエンド側に NestJs フレームワークを使用し、データの前処理とモデル予測に Python スクリプトを使用します。
プロジェクトに関する次のデータをanalytics
テーブルに収集します。 このデータのレンダリングされたバージョンは、この記事の最初のセクションですでにご覧になっています。
次のクエリで、この (ほぼ許容できる) 結果を達成できました。
@Cron(CronExpression.EVERY_HOUR) async insertHourlyProjectData(): Promise<void> { const gatherProjectsData = ` INSERT INTO analytics.hourly_projects_data (UniqueID, projectID, statisticsGathered, br_keys, br_vals, os_keys, os_vals, lc_keys, lc_vals, ref_keys, ref_vals, so_keys, so_vals, me_keys, me_vals, ca_keys, ca_vals, cc_keys, cc_vals, dv_keys, dv_vals, rg_keys, rg_vals, ct_keys, ct_vals) SELECT generateUUIDv4() as UniqueID, pid as projectID, toStartOfHour(now()) as statisticsGathered, groupArray(br) as br_keys, groupArray(br_count) as br_vals, groupArray(os) as os_keys, groupArray(os_count) as os_vals, ... groupArray(ct) as ct_keys, groupArray(ct_count) as ct_vals FROM ( SELECT pid, br, count(*) as br_count, os, count(*) as os_count, ... ct, count(*) as ct_count FROM analytics.analytics GROUP BY pid, br, os, lc, ref, so, me, ca, cc, dv, rg, ct ) GROUP BY pid; ` try { await clickhouse.query(gatherProjectsData).toPromise() } catch (e) { console.error( `[CRON WORKER] Error whilst gathering hourly data for all projects: ${e}`, )
この関数は、Cron ジョブを使用して 1 時間ごとに実行されるようにスケジュールされています。分析データを収集し、clickhouse analytics.hourly_projects_data
に挿入します。
出力
ClickHouse の制限により、データの希望する形式を実現できませんでした。そのため、モデルのトレーニングに必要な前処理を完了するために、 pandas
を使用することにしました。
具体的には、Python を使用して次の操作を実行しました。
1 つのカテゴリに関連するキーと値を 1 つの JSON フィールドに結合します。たとえば、デバイスのキーと値を 1 つのオブジェクトに結合します。
os_keys = {“Windows”, ”MacOS”, ”MacOS”, ”MacOS”, ”Linux”} os_values = {1, 2, 2, 1, 5}
の中へ:
os = {“Windows”: 1, “MacOS”: 5, “Linux”: 5}
コードと出力を添付します:
def format_data(keys_list, vals_list, threshold): """ Format data by converting string representations of lists to actual lists, then sums up the counts for each key. Keys with counts below a specified threshold are aggregated into 'Other'. """ counts = defaultdict(int) for keys_str, vals_str in zip(keys_list, vals_list): keys = ast.literal_eval(keys_str) vals = ast.literal_eval(vals_str) for key, val in zip(keys, vals): counts[key] += val final_data = defaultdict(int) for value, count in counts.items(): final_data[value] = count return dict(final_data) def process_group(group): """ Combine specific groups by a group clause, and make a """ result = {} for col in group.columns: if col.endswith('_keys'): prefix = col.split('_')[0] # Extract prefix to identify the category (eg, 'br' for browsers) threshold = other_thresholds.get(prefix, 1) # Get the threshold for this category, default to 1 vals_col = col.replace('_keys', '_vals') keys_list = group[col].tolist() vals_list = group[vals_col].tolist() result[col.replace('_keys', '')] = format_data(keys_list, vals_list, threshold) return pd.Series(result)
この形式のデータは予測自体には使用されず、むしろデータベースに保存してデバッグし、欠損値がないことを確認し、さらにモデルが正確な結果を生成していることを再確認するために使用されます。
適切なモデルをトレーニングするために、さまざまなカテゴリに他のグループを定義することにしました。つまり、特定のカテゴリ内のグループのインスタンス数が全体的に特定のパーセント (%) を下回る場合、そのグループは他のグループの一部として追加されます。
たとえば、 os
カテゴリには次のものがあります。
{“MacOS”: 300, “Windows”: 400, “Linux”: 23 and “TempleOS”: 10}
この場合、Linux と TempleOS はどちらも非常にまれであるため、他のグループに結合され、最終結果は次のようになります。
{“MacOS”: 300, “Windows”: 400, “other”: 33}.
そして、「希少性」はカテゴリーによって異なり、そのカテゴリーに指定されたしきい値に基づいて決定されます。
顧客の好みや希望するデータに基づいて設定できます
other_thresholds = { 'br': 0.06, 'os': 0.04, 'cc': 0.02, 'lc': 0.02, 'ref': 0.02, 'so': 0.03, 'me': 0.03, 'ca': 0.03, 'cc': 0.02, 'dv': 0.02, 'rg': 0.01, 'ct': 0.01 }
これを実現するために2つの機能が実装されました
def get_groups_by_treshholds(df,column_name): """Calculate total values for all columns""" if column_name in EXCLUDED_COLUMNS: return counter = count_dict_values(df[column_name]) total = sum(counter.values()) list1 = [] for key, value in counter.items(): if not (value / total) < other_thresholds[column_name]: list1.append(key) return list1 def create_group_columns(df): column_values = [] for key in other_thresholds.keys(): groups = get_groups_by_treshholds(df, key) if not groups: continue for group in groups: column_values.append(f"{key}_{group}") column_values.append(f"{key}_other") return column_values column_values = create_group_columns(df) column_values
出力
['br_Chrome', 'br_Firefox', 'os_Mac OS', 'os_other', 'cc_UA', 'cc_GB', 'cc_other', 'dv_mobile', 'dv_desktop', 'dv_other']
機械学習モデルを使用する場合、入力データがモデルが理解できる形式であることが重要です。機械学習モデルでは通常、JSON のような複雑なデータ構造ではなく、数値 (整数、浮動小数点数) が必要です。
したがって、この要件を満たすには、もう一度、データをもう少し前処理することが望ましいです。
私は、各特徴が別々の列として表され、行に対応する数値が含まれる関数create_exploded_df
を作成しました。(まだ理想的ではありませんが、これが私が生み出すことができた最善の解決策でした)
def create_exploded_df(df): """ Function which creates a new data set, iterates through the old one and fill in values according to their belongings (br_other, etc..) """ new_df = df[['projectID', 'statisticsGathered']] for group in column_values: new_df[group] = 0 new_df_cols = new_df.columns df_cols = df.columns for column in df_cols: if column in ['projectID', 'statisticsGathered']: continue for index, row in enumerate(df[column]): if column in EXCLUDED_COLUMNS: continue for key, value in row.items(): total = 0 if (a:=f"{column}_{key}") in new_df_cols: new_df[a][index] = value else: total += value new_df[f"{column}_other"][index] = total return new_df new_df = create_exploded_df(df) new_df.to_csv("2-weeks-exploded.csv") new_df
3.3 勤務時間を記入する
私たちが持っていたデータの形式に関するもう 1 つの問題は、特定の時間にプロジェクトのトラフィックがなかった場合、空白の行が作成されるのではなく、行がまったく存在しないことです。これは、モデルが今後の時間枠 (次の 1 時間など) のデータを予測するように設計されているという事実を考慮すると不便です。ただし、最初の時間枠に利用可能なデータがない場合、予測を行うようにモデルをトレーニングすることは現実的ではありません。
そこで、私は、欠落している時間を見つけて、1時間がスキップされたときに空白行を挿入するスクリプトを作成しました。
モデルのトレーニングに関しては、主なアプローチは、前の 1 時間のデータをモデルのターゲットとして使用することでした。これにより、モデルは現在のデータに基づいて将来のトラフィックを予測できます。
def sort_df_and_assign_targets(df): df = df.copy() df = df.sort_values(by=['projectID', 'statisticsGathered']) for column_name in df.columns: if not column_name.endswith('target'): continue df[column_name] = df.groupby('projectID')[column_name].shift(-1) return df new_df = sort_df_and_assign_targets(new_df)
出力
statisticsGathered
を分割する別々の列に収集するこのようなアプローチの主な理由は、 statisticsGathered
がdatetime
オブジェクトであり、私が使用しようとしたモデル (後続のセクションを参照) ではそれを処理できず、正しいパターンを識別できないことです。
その結果、 MSE/MRSE
メトリックはひどいものになりました。そのため、開発中に、 day
、 month
、 hour
ごとに機能を分離するという決定が下され、結果が大幅に向上しました。
def split_statistic_gathered(df): df['Month'] = df['statisticsGathered'].dt.month.astype(int) # as int df['Day'] = df['statisticsGathered'].dt.day.astype(int) # as int df['Hour'] = df['statisticsGathered'].dt.hour df = df.drop('statisticsGathered', axis = 1) return df new_df = split_statistic_gathered(new_df) new_df
以上です!それではトレーニングそのものに移りましょう!🎉🎉🎉
そうですね、このアプリケーションを構築する上で最も困難だったのは、実際の予測だったと思います。
最初に試してみたかったのは、 LinearRegression
モデルを使用することです。
以下の機能を実装しました。
def create_model_for_target(train_df, target_series): X_train, x_test, Y_train, y_test = train_test_split(train_df, target_series, test_size=0.3, shuffle=False) reg = LinearRegression() reg.fit(X_train, Y_train) y_pred = reg.predict(x_test) return {"y_test": y_test, "y_pred": y_pred} def create_models_for_targets(df): models_data = dict() df = df.dropna() train_df = clear_df(df) for target_name in df[[column_name for column_name in df.columns if column_name.endswith("target")]]: models_data[target_name] = create_model_for_target(train_df, df[target_name]) return models_data
各ターゲット列について、データをトレーニング セットとテスト セットに分割します。次に、トレーニング データでLinearRegression
モデルをトレーニングし、テスト データで予測を行います。
結果が正しいかどうかを評価するために、必要なメトリックを収集して出力を生成する関数を追加しました。
def evaluate_models(data): evaluation = [] for target, results in data.items(): y_test, y_pred = results['y_test'], results['y_pred'] mse = mean_squared_error(y_test, y_pred) rmse = mean_squared_error(y_test, y_pred) ** 0.5 mae = mean_absolute_error(y_test, y_pred) mean_y = y_test.mean() median_y = y_test.median() evaluation.append({'target': target, 'mse': mse, 'rmse': rmse, 'mae': mae, 'mean_y': mean_y, 'median_y': median_y}) return pd.DataFrame(evaluation)
私は出力を生成し、それをExcelファイルに保存するスクリプトを書きましたmse
、 rmse
、 mae
、 mean_y
値を計算します。
ご覧のとおり、メトリックは満足できるものではなく、予測されるトラフィック データは正確とはほど遠く、トラフィック予測の目的には適していません。
そこで、1時間あたりの訪問者数を予測することにし、以下の関数を作成しました。
def add_target_column(df, by): totals_series = df.apply(lambda x: sum(x[[column for column in df.columns if column.startswith(by)]]), axis=1) df['total'] = totals_series df[f'total_{by}_target'] = totals_series return df def shift_target_column(df, by): df = df.sort_values(by=['projectID', 'statisticsGathered'], ignore_index=True) df['total_target'] = df.groupby('projectID')[f'total_{by}_target'].shift(-1) return df new_df = add_target_column(new_df, 'br') new_df = shift_target_column(new_df, 'br') new_df[['total_br_target']]
この関数は特定のカテゴリを取得し、それに基づいて合計訪問者数を計算します。デバイス値の合計数が OS 値の合計数と同じになるため、これが機能します。
このようなアプローチにより、モデルは以前よりも 10 倍優れた結果を示しました。
このケースについて言えば、それはほぼ受け入れられ、すぐに使用できる機能です。顧客はこれらの予測の結果に応じて予算配分とサーバーの拡張を計画できるようになりました。
予測値は実際の値から約 2.45 人の訪問者数だけずれています (RMSE = √MSE であるため) 。これはマーケティング ニーズに重大な悪影響を及ぼすことはありません。
この記事はかなり長くなり、アプリはまだ開発中なので、ここで一旦中断します。今後もこのアプローチを改良し続けますので、引き続き最新情報をお伝えします。
読んでいただき、また注目していただき、ありがとうございます。コメント欄で皆さんのフィードバックやご意見をお待ちしております。この情報が皆さんの目的に役立つことを願っています。
そして成功を祈る!