要約
LLM を使用すると、テキスト データの一般的な自然言語処理タスク (分類、感情分析など) を、dbt 環境にとどまりながら、100 万行あたり 10 ドルという低価格で自動的に解決できます (タスクとモデルによって異なります)。手順、詳細、およびコードを以下に示します。
dbt を変換レイヤーとして使用している場合、非構造化テキスト データから意味のある情報を抽出したい場合があります。このようなデータには、顧客レビュー、タイトル、説明、Google アナリティクスのソース/メディアなどが含まれます。それらをグループに分類したり、感情やトーンを取得したりする必要があるかもしれません。
潜在的な解決策としては
Python dbt モデルが進化するにつれて、もう 1 つの解決策があります。これらの自然言語処理タスクを dbt モデルの 1 つとして dbt 環境内に保持することができます。
それが役に立つと思われる場合は、dbt プロジェクトで OpenAI API を使用する方法についてのステップバイステップ ガイドを以下で参照してください。GitHub リポジトリのコードとデータ サンプルを使用して、このガイドのすべてを自分の環境で再現できます (最後のリンクを参照)。
すでに dbt プロジェクトとデータがある場合、または結果を再現したくない場合は、(4) に進むか、このセクションを完全にスキップしてください。それ以外の場合は、次のものが必要になります。
dbt プロジェクトをセットアップします。公式ドキュメント
このガイド用に用意したものをGitHubからクローンするだけです。
profiles.yml ファイルを作成/更新することを忘れないでください。
データベースをセットアップします。私はSnowflakeを使いました。残念ながら無料版はありませんが、 30日間の無料トライアルは提供されています。
現在、dbt Python モデルは Snowflake、Databricks、BigQuery でのみ動作します (PostgreSQL は動作しません)。したがって、このチュートリアルはいずれのモデルでも動作するはずですが、詳細は異なる場合があります。
ソースデータを準備する
OpenAI APIキーを取得する
公式ドキュメントのクイックスタート手順に従ってください。
注意: 無料ではありませんが、従量課金制です。したがって、テスト用の 10 行のデータセットでは、実験中に 1 ドルを超える料金は請求されません。
より注意するために、支出限度額を設定してください。
Snowflakeで外部アクセス統合を設定する
まず、分類タスクを解く場合、LLM プロンプトで使用するカテゴリ (クラスとも呼ばれます) が必要です。基本的には、「これらのカテゴリのリストがありますが、このテキストがどのカテゴリに属するか定義できますか?」と言うことになります。
いくつかのオプションは次のとおりです:
定義済みカテゴリのリストを手動で作成する
安定した予測可能なカテゴリが必要な場合に適しています。
ここで「その他」を忘れずに追加してください。そうすれば、不明な場合に LLM がこれらのオプションを使用できるようになります。
LLM が「その他」カテゴリを使用する場合は、プロンプトでカテゴリ名を提案するように依頼します。
定義済みのリストをデータベースの生のレイヤーにアップロードするか、または dbt プロジェクトに CSV としてアップロードします ( dbt seed
を使用)。
データのサンプルを LLM に入力し、N 個のカテゴリを作成するように依頼します。
前回と同じアプローチですが、リストの作成に協力してもらっています。
GPT を使用する場合は、再現性のためにここでシードを使用することをお勧めします。
事前定義されたカテゴリを使用せずに、LLM に作業を任せましょう。
これにより、予測しにくい結果が生じる可能性があります。
同時に、ランダム性の余地が許容できるのであれば、それで十分です。
GPT の使用例では、再実行が必要な場合に異なる結果を回避するために、temperature = 0 を設定することをお勧めします。
このブログ投稿では、3 番目のオプションを選択します。
さて、この投稿の本題に入り、アップストリーム テーブルから新しいテキスト データを取得し、それを OpenAI API にフィードして、カテゴリをテーブルに保存する dbt モデルを作成しましょう。
前述のように、R パッケージ データセットを使用します。R は、データ分析で非常に人気のあるプログラミング言語です。このデータセットには、バージョン、ライセンス、作成者、タイトル、説明など、CRAN プロジェクトの R パッケージに関する情報が含まれています。タイトルに基づいて各パッケージのカテゴリを作成するため、 title
フィールドに注目します。
モデルのベースを準備する
dbt 構成はdbt.config(...)
メソッドを介して渡すことができます。
dbt.config には追加の引数があります。たとえば、 packages
パッケージ要件です。
dbt Python モデルは上流モデルdbt.ref('...')
またはdbt.source('...')
を参照できます。
DataFrame を返す必要があります。データベースはそれをテーブルとして保存します。
import os import openai import pandas as pd COL_TO_CATEGORIZE = 'title' def model(dbt, session): import _snowflake dbt.config( packages=['pandas', 'openai'], ) df = dbt.ref('package').to_pandas() df.drop_duplicates(subset=[COL_TO_CATEGORIZE], inplace=True) return df
OpenAI APIに接続する
secrets
とexternal_access_integrations
dbt.config に渡す必要があります。これには、Snowflake 外部アクセス統合に保存されているシークレット参照が含まれます。
注: この機能は数日前にリリースされたばかりで、ベータ版の dbt バージョン 1.8.0-b3 でのみ利用可能です。
dbt.config( packages=['pandas', 'openai'], secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'}, external_access_integrations=['openai_external_access_integration'], ) client = openai.OpenAI( api_key=_snowflake.get_generic_secret_string('openai_key'), organization=_snowflake.get_generic_secret_string('openai_org'), )
dbt モデルを増分にして、完全更新をオフにします。
dbt run
を実行するたびに、完全なデータが OpenAI に送信されることになります。これは 1 日に数回実行されることもあります。materialized='incremental'
、 incremental_strategy='append'
、 full_refresh = False
追加します。 dbt.config( materialized='incremental', incremental_strategy='append', full_refresh = False, packages=['pandas', 'openai'], secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'}, external_access_integrations=['openai_external_access_integration'], ) if dbt.is_incremental: pass
増分ロジックを追加する
dbt.this
を使用するだけで実行できます。通常の増分モデルと同様です。 if dbt.is_incremental: categorized_query = f''' SELECT DISTINCT "{ COL_TO_CATEGORIZE }" AS primary_key FROM { dbt.this } WHERE "category" IS NOT NULL ''' categorized = [row.PRIMARY_KEY for row in session.sql(categorized_query).collect()] df = df.loc[~df[COL_TO_CATEGORIZE].isin(categorized), :]
OpenAI APIをバッチで呼び出す
max_tokens
制約を追加しました。 BATCH_SIZE = 5 n_rows = df.shape[0] categories = [None for idx in range(n_rows)] for idx in range(0, n_rows, BATCH_SIZE): df_sliced = df.iloc[idx:idx+BATCH_SIZE, :] user_prompt = f'```{ "|".join(df_sliced[COL_TO_CATEGORIZE].to_list()) }```' chat_completion = client.chat.completions.create( messages=[ {'role': 'system', 'content': SYSTEM_PROMPT}, {'role': 'user', 'content': user_prompt} ], model='gpt-3.5-turbo', temperature=0, max_tokens=10*BATCH_SIZE + 2*BATCH_SIZE, ) gpt_response = chat_completion.choices[0].message.content gpt_response = [category.strip() for category in gpt_response.split('|')] categories[idx:idx + len(gpt_response)] = gpt_response df['category'] = categories df.dropna(subset=['category'], inplace=True)
LLM のプロンプトについて話す時間です。私が得たものは次のとおりです。
CRAN R パッケージ タイトルのリストが ``` 括弧で囲まれて提供されます。タイトルは "|" 記号で区切られます。タイトルごとにカテゴリを作成します。"|" 記号で区切られたカテゴリ名のみを返します。
最終的な dbt モデル コード
import os import openai import pandas as pd SYSTEM_PROMPT = '''You will be provided a list of CRAN R package titles in ``` brackets. Titles will be separated by "|" sign. Come up with a category for each title. Return only category names separated by "|" sign. ''' COL_TO_CATEGORIZE = 'title' BATCH_SIZE = 5 def model(dbt, session): import _snowflake dbt.config( materialized='incremental', incremental_strategy='append', full_refresh = False, packages=['pandas', 'openai'], secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'}, external_access_integrations=['openai_external_access_integration'], ) client = openai.OpenAI( api_key=_snowflake.get_generic_secret_string('openai_key'), organization=_snowflake.get_generic_secret_string('openai_org'), ) df = dbt.ref('package').to_pandas() df.drop_duplicates(subset=[COL_TO_CATEGORIZE], inplace=True) if dbt.is_incremental: categorized_query = f''' SELECT DISTINCT "{ COL_TO_CATEGORIZE }" AS primary_key FROM { dbt.this } WHERE "category" IS NOT NULL ''' categorized = [row.PRIMARY_KEY for row in session.sql(categorized_query).collect()] df = df.loc[~df[COL_TO_CATEGORIZE].isin(categorized), :] n_rows = df.shape[0] categories = [None for idx in range(n_rows)] for idx in range(0, n_rows, BATCH_SIZE): df_sliced = df.iloc[idx:idx+BATCH_SIZE, :] user_prompt = f'```{ "|".join(df_sliced[COL_TO_CATEGORIZE].to_list()) }```' chat_completion = client.chat.completions.create( messages=[ {'role': 'system', 'content': SYSTEM_PROMPT}, {'role': 'user', 'content': user_prompt} ], model='gpt-3.5-turbo', temperature=0, max_tokens=10*BATCH_SIZE + 2*BATCH_SIZE, ) gpt_response = chat_completion.choices[0].message.content gpt_response = [category.strip() for category in gpt_response.split('|')] categories[idx:idx + len(gpt_response)] = gpt_response df['category'] = categories df.dropna(subset=['category'], inplace=True) return df
OpenAI API の料金については、こちら をご覧ください。リクエストされたトークンの数と返されたトークンの数に応じて料金が発生します。トークンは、リクエスト内の文字数に関連付けられたインスタンスです。特定のテキストのトークンの数を評価するオープンソース パッケージがあります。たとえば、 Tiktoken などです。手動で評価したい場合は、こちら の公式 OpenAI トークナイザーにアクセスしてください。
私たちのデータセットには、約 18,000 のタイトルがあります。大まかに言うと、320,000 の入力トークン (バッチ サイズ = 5 を使用する場合、180,000 のタイトルと 140,000 のシステム プロンプト) と 50,000 の出力トークンに相当します。モデルに応じて、フルスキャンのコストは次のようになります。
GPT-4 Turbo
: 4.7 ドル。価格: 入力: 100 万トークンあたり 10 ドル、出力: 30 万トークンあたり。GPT-4
: 12.6 ドル。価格: 入力: 30 ドル / 100 万トークン、出力: 60 ドル / 100 万トークン。GPT-3.5 Turbo
: $0.2。価格: 入力: $0.5 / 100万トークン、出力: $1.5 / 100万トークン。dbt モデルは見事に機能しました。18K のパッケージすべてを隙間なく分類できました。このモデルはコスト効率が良く、複数の dbt 実行から保護されていることが証明されました。
結果ダッシュボードを Tableau Public にここで公開しました。自由に操作し、データをダウンロードし、その上に好きなものを作成してください。
私が見つけた興味深い詳細は次のとおりです:
Data Visualization
(1,190 パッケージ、6%) です。これは、特に Shiny、Plotly などのパッケージとともに、視覚化ツールとしての R の人気を証明していると思います。
Data Import
とData Processing
でした。R はデータ処理ツールとしてより多く使用され始めたようです。
Natural Language Processing
た。有名な論文「Attention Is All You Need」の2年後、GPT-1のリリースから半年後のことでした:)別のアプローチとして、 GPT 埋め込みを使用することもできます。
ずっと安いですよ。
ただし、分類部分は自分で行う必要があるため、エンジニアリングの要素が強くなります (このオプションについては、次の投稿で取り上げる予定ですので、お楽しみに)。
確かに、この部分を dbt から削除し、クラウド機能または使用するインフラにプッシュすることは理にかなっています。同時に、これを dbt の下に保持したい場合は、この投稿が役立ちます。
モデルにロジックを追加しないでください。モデルは LLM を呼び出して結果を保存するという 1 つの作業だけを行う必要があります。これにより、再実行を避けることができます。
dbt プロジェクトでは多くの環境を使用している可能性が高いです。各プル リクエストで各開発環境でこのモデルを何度も実行しないように注意する必要があります。
これを行うには、 if dbt.config.get("target_name") == 'dev'
というロジックを組み込むことができます。
区切り文字を使用した応答は不安定になる可能性があります。
たとえば、GPT は予想よりも少ない要素を返す可能性があり、最初のタイトルをカテゴリのリストにマッピングすることが難しくなります。
これを解決するには、リクエストにresponse_format={ "type": "json_object" }
を追加して、JSON 出力を要求します。公式ドキュメントを参照してください。
JSON 出力を使用すると、プロンプトで {"title": "category"} の形式で回答を入力するように要求し、それを初期値にマッピングできます。
応答サイズが大きくなるため、コストが高くなることに注意してください。
不思議なことに、GPT 3.5 Turbo を JSON に切り替えると、分類の品質が劇的に低下しました。
Snowflake にはcortex.complete()関数を使用する代替手段があります。dbt ブログの Joel Labes による素晴らしい投稿をご覧ください。
以上です!ご意見をお聞かせください。
GitHub の完全なコード:リンク
Tableau Public ダッシュボード: リンク
TidyTuesday Rデータセット:リンク