TL;DR
You can automatically solve typical Natural Language Processing tasks (classification, sentiment analysis, etc.) for your text data using LLM for as cheap as $10 per 1M rows (it depends on the task and the model), staying in your dbt environment. Instructions, details, and code are below
If you are using dbt as your transformation layer, you might have a situation when you want to extract meaningful info from unstructured text data. Such data might include customer reviews, titles, descriptions, Google Analytics sources/mediums, etc. You might want to categorize them into groups or fetch sentiments and tones.
Potential solutions would be
As Python dbt models are evolving, there is one more solution: you can keep these Natural Language Processing tasks inside your dbt environment as one of the dbt models.
If that might be helpful for you, see below a step-by-step guide on how to use OpenAI API in your dbt project. You can reproduce everything from this guide in your environment, having the code and data sample from the GitHub repository (see links at the end).
If you already have a dbt project and data or don't want to reproduce the results, jump to (4) or skip this section completely. Otherwise, you'll need the following:
Set up the dbt project. Official docs
You can simply clone the one I prepared for this guide from GitHub.
Don't forget to create/update your profiles.yml file.
Set up the database. I used Snowflake. Unfortunately, there is no free version, but they provide a 30-day free trial, though
Currently, dbt Python models work only with Snowflake, Databricks, and BigQuery (no PostgreSQL). So, this tutorial should work for any of them, although some details might vary
Prepare source data
As a dataset, I used an R package metadata published in the TidyTuesday repository.
Upload it to your database.
Update the source.yml
file in the dbt project to match your database and schema names.
Get the OpenAI API key
Follow quickstart instructions from official docs.
Not:e it's not free, but it is pay-as-you-go. So, with the test 10-row dataset, you won't be charged more than $1 during your experiments.
To be extra careful, set a spending limit.
Set up External Access Integration in Snowflake
Firstly, if you are solving a classification task, you need categories (a.k.a. classes) to use in your LLM prompt. Basically, you will say: "I have a list of these categories, could you define to which one belongs this text?"
Some options here:
Create a list of predefined categories manually
It suits if you need stable and predictable categories.
Don't forget to add the "Others" here, so LLM will have these options when it's uncertain.
Ask LLM in your prompt to suggest a category name whenever it uses the "Others" category.
Upload a predefined list to the raw layer of the database or as a CSV in your dbt project (utilizing dbt seed
).
Feed a sample of your data to LLM, and ask it to come up with N categories.
The same approach as the previous one, but we are getting help with the list.
If you use GPT, it's better to use seed here for reproducibility.
Go without predefined categories, and let LLM do the work on the go.
This might lead to less predictable results.
At the same time, it is good enough if you are alright with a margin of randomness.
In the GPT use case, it's better to put temperature = 0 to avoid different results in case you need to rerun.
In this blog post, I will go with the 3rd option.
Now, let's go to the meat of this post and create a dbt model that will take new text data from the upstream table, feed it to the OpenAI API, and save the category into the table.
As mentioned above, I'm going to use the R packages dataset. R is a highly popular programming language in data analysis. This dataset contains information about R packages from the CRAN project, such as version, license, author, title, description, etc. We are interested in the title
field, as we are going to create a category for each package based on its title.
Prepare the base for the model
dbt config can be passed via the dbt.config(...)
method.
There are additional arguments in dbt.config, for example, packages
is a package requirement.
dbt Python model can reference upstream models dbt.ref('...')
or dbt.source('...')
It must return a DataFrame. Your database will save it as a table.
import os
import openai
import pandas as pd
COL_TO_CATEGORIZE = 'title'
def model(dbt, session):
import _snowflake
dbt.config(
packages=['pandas', 'openai'],
)
df = dbt.ref('package').to_pandas()
df.drop_duplicates(subset=[COL_TO_CATEGORIZE], inplace=True)
return df
Connect to OpenAI API
We need to pass secrets
and external_access_integrations
to the dbt.config. It will contain the secret reference that is stored in your Snowflake External Access Integration.
Note: this feature was released only several days ago and is only available in the beta dbt version 1.8.0-b3
dbt.config(
packages=['pandas', 'openai'],
secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'},
external_access_integrations=['openai_external_access_integration'],
)
client = openai.OpenAI(
api_key=_snowflake.get_generic_secret_string('openai_key'),
organization=_snowflake.get_generic_secret_string('openai_org'),
)
Make the dbt model incremental, and turn off full refreshes.
dbt run
, which can be several times a day.materialized='incremental'
, incremental_strategy='append'
, full_refresh = False
, to dbt.configdbt.config(
materialized='incremental',
incremental_strategy='append',
full_refresh = False,
packages=['pandas', 'openai'],
secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'},
external_access_integrations=['openai_external_access_integration'],
)
if dbt.is_incremental:
pass
Add incrementality logic
dbt.this
. Similar to normal incremental models.if dbt.is_incremental:
categorized_query = f'''
SELECT DISTINCT "{ COL_TO_CATEGORIZE }" AS primary_key FROM { dbt.this }
WHERE "category" IS NOT NULL
'''
categorized = [row.PRIMARY_KEY for row in session.sql(categorized_query).collect()]
df = df.loc[~df[COL_TO_CATEGORIZE].isin(categorized), :]
Call OpenAI API in batches
max_tokens
constraint.BATCH_SIZE = 5
n_rows = df.shape[0]
categories = [None for idx in range(n_rows)]
for idx in range(0, n_rows, BATCH_SIZE):
df_sliced = df.iloc[idx:idx+BATCH_SIZE, :]
user_prompt = f'```{ "|".join(df_sliced[COL_TO_CATEGORIZE].to_list()) }```'
chat_completion = client.chat.completions.create(
messages=[
{'role': 'system', 'content': SYSTEM_PROMPT},
{'role': 'user', 'content': user_prompt}
],
model='gpt-3.5-turbo',
temperature=0,
max_tokens=10*BATCH_SIZE + 2*BATCH_SIZE,
)
gpt_response = chat_completion.choices[0].message.content
gpt_response = [category.strip() for category in gpt_response.split('|')]
categories[idx:idx + len(gpt_response)] = gpt_response
df['category'] = categories
df.dropna(subset=['category'], inplace=True)
Time to talk about a prompt for LLM. That's what I got:
You will be provided a list of CRAN R package titles in ``` brackets. Titles will be separated by "|" sign. Come up with a category for each title. Return only category names separated by "|" sign.
Final dbt model code
import os
import openai
import pandas as pd
SYSTEM_PROMPT = '''You will be provided a list of CRAN R package titles in ``` brackets.
Titles will be separated by "|" sign.
Come up with a category for each title.
Return only category names separated by "|" sign.
'''
COL_TO_CATEGORIZE = 'title'
BATCH_SIZE = 5
def model(dbt, session):
import _snowflake
dbt.config(
materialized='incremental',
incremental_strategy='append',
full_refresh = False,
packages=['pandas', 'openai'],
secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'},
external_access_integrations=['openai_external_access_integration'],
)
client = openai.OpenAI(
api_key=_snowflake.get_generic_secret_string('openai_key'),
organization=_snowflake.get_generic_secret_string('openai_org'),
)
df = dbt.ref('package').to_pandas()
df.drop_duplicates(subset=[COL_TO_CATEGORIZE], inplace=True)
if dbt.is_incremental:
categorized_query = f'''
SELECT DISTINCT "{ COL_TO_CATEGORIZE }" AS primary_key FROM { dbt.this }
WHERE "category" IS NOT NULL
'''
categorized = [row.PRIMARY_KEY for row in session.sql(categorized_query).collect()]
df = df.loc[~df[COL_TO_CATEGORIZE].isin(categorized), :]
n_rows = df.shape[0]
categories = [None for idx in range(n_rows)]
for idx in range(0, n_rows, BATCH_SIZE):
df_sliced = df.iloc[idx:idx+BATCH_SIZE, :]
user_prompt = f'```{ "|".join(df_sliced[COL_TO_CATEGORIZE].to_list()) }```'
chat_completion = client.chat.completions.create(
messages=[
{'role': 'system', 'content': SYSTEM_PROMPT},
{'role': 'user', 'content': user_prompt}
],
model='gpt-3.5-turbo',
temperature=0,
max_tokens=10*BATCH_SIZE + 2*BATCH_SIZE,
)
gpt_response = chat_completion.choices[0].message.content
gpt_response = [category.strip() for category in gpt_response.split('|')]
categories[idx:idx + len(gpt_response)] = gpt_response
df['category'] = categories
df.dropna(subset=['category'], inplace=True)
return df
OpenAI API Pricing is listed here. They charge for the number of tokens requested and returned. The token is an instance correlated to a number of characters in your request. There are open-source packages to evaluate a number of tokens for a given text. For example, Tiktoken. If you want to evaluate it manually, the place to go is an official OpenAI tokenizer here.
In our dataset, there are ~18K titles. Roughly, it is equal to 320K input tokens (180K titles and 140K system prompt if we use batch size = 5) and 50K output tokens. Depending on the model, the costs for the full scan will be:
GPT-4 Turbo
: $4.7. Pricing: input: $10 / 1M tokens; output: $30 / 1M tokens.GPT-4
: $12.6. Pricing: input: $30 / 1M tokens; output: $60 / 1M tokens.GPT-3.5 Turbo
: $0.2. Pricing: input: $0.5 / 1M tokens; output: $1.5 / 1M tokens.The dbt model worked like a charm. I successfully categorized all 18K packages without any gaps. The model proved to be cost-efficient and protected against multiple dbt runs.
I published the results dashboard to Tableau Public here. Feel free to play with it, download the data, and create whatever you desire on top of it.
Some interesting details I found:
Data Visualization
(1,190 packages, or 6%). I guess this proves the popularity of R as a visualization tool, especially with packages like Shiny, Plotly, and others.
Data Import
and Data Processing
. Sounds like R started to be used more as a data-processing tool.
Natural Language Processing
in 2019. Two years after the famous paper "Attention Is All You Need" and half a year after the GPT-1 release :)We can use an alternative approach — the GPT embeddings.
It's much cheaper.
But more engineering-heavy because you should do the classification part on your own (stay tuned, as I'm going to explore this option in one of the next posts).
Certainly, it makes sense to remove this part from dbt and push it to cloud functions or whatever infra you use. At the same time, if you'd like to keep it under the dbt — this post gets you covered.
Avoid adding any logic to the model. It should do one job — call LLM and save the result. This will help you to stay away from re-running it.
Chances are high that you are using many environments in your dbt project. You need to be mindful and avoid running this model over and over again in each developer environment on each Pull Request.
To do this, you can incorporate logic with if dbt.config.get("target_name") == 'dev'
Response with a delimiter can be unstable.
For example, GPT can return fewer elements than you expected, and it will be hard to map initial titles to the list of categories.
To overcome this, add response_format={ "type": "json_object" }
in your request to require JSON output. See the official docs.
With JSON output, you can ask in prompt to provide an answer in the format {"title": "category"}, and then map it to your initial values.
Note it will be more expensive, as it will increase response size.
Strangely enough, the quality of classification dropped dramatically when I switched to JSON for GPT 3.5 Turbo.
There is an alternative in Snowflake — using cortex.complete() function. Check out a great post by Joel Labes on the dbt blog.
That's it! Let me know what you think.
Full code on GitHub: link
Tableau Public dashboard: link
TidyTuesday R dataset: link