Skip to content
Merged
13 changes: 13 additions & 0 deletions samples/dbt/.dbt.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
dbt_sample_project:
outputs:
dev: # The target environment name (e.g., dev, prod)
compute_region: us-central1 # Region used for compute operations
dataset: dbt_sample_dateset # BigQuery dataset where dbt will create models
gcs_bucket: dbt_sample_bucket # GCS bucket to store output files
location: US # BigQuery dataset location
method: oauth # Authentication method
priority: interactive # Job priority: "interactive" or "batch"
project: bigframes-dev # GCP project ID
threads: 1 # Number of threads dbt can use for running models in parallel
type: bigquery # Specifies the dbt adapter
target: dev # The default target environment
62 changes: 62 additions & 0 deletions samples/dbt/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# dbt BigFrames Integration

This repository provides simple examples of using **dbt Python models** with **BigQuery** in **BigFrames** mode.

It includes basic configurations and sample models to help you get started quickly in a typical dbt project.

## Highlights

- `profiles.yml`: configures your connection to BigQuery.
- `dbt_project.yml`: configures your dbt project - **dbt_sample_project**.
- `dbt_bigframes_code_sample_1.py`: An example to read BigQuery data and perform basic transformation.
- `dbt_bigframes_code_sample_2.py`: An example to build an incremental model that leverages BigFrames UDF capabilities.

## Requirements

Before using this project, ensure you have:

- A [Google Cloud account](https://cloud.google.com/free?hl=en)
- A [dbt Cloud account](https://www.getdbt.com/signup) (if using dbt Cloud)
- Python and SQL basics
- Familiarity with dbt concepts and structure

For more, see:
- https://docs.getdbt.com/guides/dbt-python-bigframes
- https://cloud.google.com/bigquery/docs/dataframes-dbt

## Run Locally

Follow these steps to run the Python models using dbt Core.

1. **Install the dbt BigQuery adapter:**

```bash
pip install dbt-bigquery
```

2. **Initialize a dbt project (if not already done):**

```bash
dbt init
```

Follow the prompts to complete setup.

3. **Finish the configuration and add sample code:**

- Edit `~/.dbt/profiles.yml` to finish the configuration.
- Replace or add code samples in `.../models/example`.

4. **Run your dbt models:**

To run all models:

```bash
dbt run
```

Or run a specific model:

```bash
dbt run --select your_model_name
```
39 changes: 39 additions & 0 deletions samples/dbt/dbt_sample_project/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing license header.

# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'dbt_sample_project'
version: '1.0.0'

# This setting configures which "profile" dbt uses for this project.
profile: 'dbt_sample_project'

# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_packages"


# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models

# In this example config, we tell dbt to build all models in the example/
# directory as views. These settings can be overridden in the individual model
# files using the `{{ config(...) }}` macro.
models:
dbt_sample_project:
# Optional: These settings (e.g., submission_method, notebook_template_id,
# etc.) can also be defined directly in the Python model using dbt.config.
submission_method: bigframes
# Config indicated by + and applies to all files under models/example/
example:
+materialized: view
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# This example demonstrates one of the most general usages of transforming raw
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing license header.

# BigQuery data into a processed table using a dbt Python model with BigFrames.
# See more from: https://cloud.google.com/bigquery/docs/dataframes-dbt.
#
# Key defaults when using BigFrames in a dbt Python model for BigQuery:
# - The default materialization is 'table' unless specified otherwise. This
# means dbt will create a new BigQuery table from the result of this model.
# - The default timeout for the job is 3600 seconds (60 minutes). This can be
# adjusted if your processing requires more time.
# - If no runtime template is provided, dbt will automatically create and reuse
# a default one for executing the Python code in BigQuery.
#
# BigFrames provides a pandas-like API for BigQuery data, enabling familiar
# data manipulation directly within your dbt project. This code sample
# illustrates a basic pattern for:
# 1. Reading data from an existing BigQuery dataset.
# 2. Processing it using pandas-like DataFrame operations powered by BigFrames.
# 3. Outputting a cleaned and transformed table, managed by dbt.


def model(dbt, session):
# Optional: Override settings from your dbt_project.yml file.
# When both are set, dbt.config takes precedence over dbt_project.yml.
#
# Use `dbt.config(submission_method="bigframes")` to tell dbt to execute
# this Python model using BigQuery DataFrames (BigFrames). This allows you
# to write pandas-like code that operates directly on BigQuery data
# without needing to pull all data into memory.
dbt.config(submission_method="bigframes")

# Define the BigQuery table path from which to read data.
table = "bigquery-public-data.epa_historical_air_quality.temperature_hourly_summary"

# Define the specific columns to select from the BigQuery table.
columns = ["state_name", "county_name", "date_local", "time_local", "sample_measurement"]

# Read data from the specified BigQuery table into a BigFrames DataFrame.
df = session.read_gbq(table, columns=columns)

# Sort the DataFrame by the specified columns. This prepares the data for
# `drop_duplicates` to ensure consistent duplicate removal.
df = df.sort_values(columns).drop_duplicates(columns)

# Group the DataFrame by 'state_name', 'county_name', and 'date_local'. For
# each group, calculate the minimum and maximum of the 'sample_measurement'
# column. The result will be a BigFrames DataFrame with a MultiIndex.
result = df.groupby(["state_name", "county_name", "date_local"])["sample_measurement"]\
.agg(["min", "max"])

# Rename some columns and convert the MultiIndex of the 'result' DataFrame
# into regular columns. This flattens the DataFrame so 'state_name',
# 'county_name', and 'date_local' become regular columns again.
result = result.rename(columns={'min': 'min_temperature', 'max': 'max_temperature'})\
.reset_index()

# Return the processed BigFrames DataFrame.
# In a dbt Python model, this DataFrame will be materialized as a table
return result
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# This example demonstrates how to build an **incremental dbt Python model**
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing license header.

# using BigFrames.
#
# Incremental models are essential for efficiently processing large datasets by
# only transforming new or changed data, rather than reprocessing the entire
# dataset every time. If the target table already exists, dbt will perform a
# merge based on the specified unique keys; otherwise, it will create a new
# table automatically.
#
# This model also showcases the definition and application of a **BigFrames
# User-Defined Function (UDF)** to add a descriptive summary column based on
# temperature data. BigFrames UDFs allow you to execute custom Python logic
# directly within BigQuery, leveraging BigQuery's scalability.


import bigframes.pandas as bpd

def model(dbt, session):
# Optional: override settings from dbt_project.yml.
# When both are set, dbt.config takes precedence over dbt_project.yml.
dbt.config(
# Use BigFrames mode to execute this Python model. This enables
# pandas-like operations directly on BigQuery data.
submission_method="bigframes",
# Materialize this model as an 'incremental' table. This tells dbt to
# only process new or updated data on subsequent runs.
materialized='incremental',
# Use MERGE strategy to update rows during incremental runs.
incremental_strategy='merge',
# Define the composite key that uniquely identifies a row in the
# target table. This key is used by the 'merge' strategy to match
# existing rows for updates during incremental runs.
unique_key=["state_name", "county_name", "date_local"],
)

# Reference an upstream dbt model or an existing BigQuery table as a
# BigFrames DataFrame. It allows you to seamlessly use the output of another
# dbt model as input to this one.
df = dbt.ref("dbt_bigframes_code_sample_1")

# Define a BigFrames UDF to generate a temperature description.
# BigFrames UDFs allow you to define custom Python logic that executes
# directly within BigQuery. This is powerful for complex transformations.
@bpd.udf(dataset='dbt_sample_dataset', name='describe_udf')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be using session.udf instead?

def describe(
max_temperature: float,
min_temperature: float,
) -> str:
is_hot = max_temperature > 85.0
is_cold = min_temperature < 50.0

if is_hot and is_cold:
return "Expect both hot and cold conditions today."
if is_hot:
return "Overall, it's a hot day."
if is_cold:
return "Overall, it's a cold day."
return "Comfortable throughout the day."

# Apply the UDF using combine and store the result in a column "describe".
df["describe"] = df["max_temperature"].combine(df["min_temperature"], describe)

# Return the transformed BigFrames DataFrame.
# This DataFrame will be the final output of your incremental dbt model.
# On subsequent runs, only new or changed rows will be processed and merged
# into the target BigQuery table based on the `unique_key`.
return df