-
Notifications
You must be signed in to change notification settings - Fork 63
feat: add code samples for dbt bigframes integration #1898
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2b6f3f5
55a6fa4
86183c9
4ac7f7e
77d6c17
7da6bcb
b564a66
88c708c
4a0cc37
1c3d5f6
a96687e
5fa2970
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| dbt_sample_project: | ||
| outputs: | ||
| dev: # The target environment name (e.g., dev, prod) | ||
| compute_region: us-central1 # Region used for compute operations | ||
| dataset: dbt_sample_dateset # BigQuery dataset where dbt will create models | ||
| gcs_bucket: dbt_sample_bucket # GCS bucket to store output files | ||
| location: US # BigQuery dataset location | ||
| method: oauth # Authentication method | ||
| priority: interactive # Job priority: "interactive" or "batch" | ||
| project: bigframes-dev # GCP project ID | ||
| threads: 1 # Number of threads dbt can use for running models in parallel | ||
| type: bigquery # Specifies the dbt adapter | ||
| target: dev # The default target environment |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| # dbt BigFrames Integration | ||
|
|
||
| This repository provides simple examples of using **dbt Python models** with **BigQuery** in **BigFrames** mode. | ||
|
|
||
| It includes basic configurations and sample models to help you get started quickly in a typical dbt project. | ||
|
|
||
| ## Highlights | ||
|
|
||
| - `profiles.yml`: configures your connection to BigQuery. | ||
| - `dbt_project.yml`: configures your dbt project - **dbt_sample_project**. | ||
| - `dbt_bigframes_code_sample_1.py`: An example to read BigQuery data and perform basic transformation. | ||
| - `dbt_bigframes_code_sample_2.py`: An example to build an incremental model that leverages BigFrames UDF capabilities. | ||
|
|
||
| ## Requirements | ||
|
|
||
| Before using this project, ensure you have: | ||
|
|
||
| - A [Google Cloud account](https://cloud.google.com/free?hl=en) | ||
| - A [dbt Cloud account](https://www.getdbt.com/signup) (if using dbt Cloud) | ||
| - Python and SQL basics | ||
| - Familiarity with dbt concepts and structure | ||
|
|
||
| For more, see: | ||
| - https://docs.getdbt.com/guides/dbt-python-bigframes | ||
| - https://cloud.google.com/bigquery/docs/dataframes-dbt | ||
|
|
||
| ## Run Locally | ||
|
|
||
| Follow these steps to run the Python models using dbt Core. | ||
|
|
||
| 1. **Install the dbt BigQuery adapter:** | ||
|
|
||
| ```bash | ||
| pip install dbt-bigquery | ||
| ``` | ||
|
|
||
| 2. **Initialize a dbt project (if not already done):** | ||
|
|
||
| ```bash | ||
| dbt init | ||
| ``` | ||
|
|
||
| Follow the prompts to complete setup. | ||
|
|
||
| 3. **Finish the configuration and add sample code:** | ||
|
|
||
| - Edit `~/.dbt/profiles.yml` to finish the configuration. | ||
| - Replace or add code samples in `.../models/example`. | ||
|
|
||
| 4. **Run your dbt models:** | ||
|
|
||
| To run all models: | ||
|
|
||
| ```bash | ||
| dbt run | ||
| ``` | ||
|
|
||
| Or run a specific model: | ||
|
|
||
| ```bash | ||
| dbt run --select your_model_name | ||
| ``` | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
|
|
||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing license header. |
||
| # Name your project! Project names should contain only lowercase characters | ||
| # and underscores. A good package name should reflect your organization's | ||
| # name or the intended use of these models | ||
| name: 'dbt_sample_project' | ||
| version: '1.0.0' | ||
|
|
||
| # This setting configures which "profile" dbt uses for this project. | ||
| profile: 'dbt_sample_project' | ||
|
|
||
| # These configurations specify where dbt should look for different types of files. | ||
| # The `model-paths` config, for example, states that models in this project can be | ||
| # found in the "models/" directory. You probably won't need to change these! | ||
| model-paths: ["models"] | ||
| analysis-paths: ["analyses"] | ||
| test-paths: ["tests"] | ||
| seed-paths: ["seeds"] | ||
| macro-paths: ["macros"] | ||
| snapshot-paths: ["snapshots"] | ||
|
|
||
| clean-targets: # directories to be removed by `dbt clean` | ||
| - "target" | ||
| - "dbt_packages" | ||
|
|
||
|
|
||
| # Configuring models | ||
| # Full documentation: https://docs.getdbt.com/docs/configuring-models | ||
|
|
||
| # In this example config, we tell dbt to build all models in the example/ | ||
| # directory as views. These settings can be overridden in the individual model | ||
| # files using the `{{ config(...) }}` macro. | ||
| models: | ||
| dbt_sample_project: | ||
| # Optional: These settings (e.g., submission_method, notebook_template_id, | ||
| # etc.) can also be defined directly in the Python model using dbt.config. | ||
| submission_method: bigframes | ||
| # Config indicated by + and applies to all files under models/example/ | ||
| example: | ||
| +materialized: view | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| # This example demonstrates one of the most general usages of transforming raw | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing license header. |
||
| # BigQuery data into a processed table using a dbt Python model with BigFrames. | ||
| # See more from: https://cloud.google.com/bigquery/docs/dataframes-dbt. | ||
| # | ||
| # Key defaults when using BigFrames in a dbt Python model for BigQuery: | ||
| # - The default materialization is 'table' unless specified otherwise. This | ||
| # means dbt will create a new BigQuery table from the result of this model. | ||
| # - The default timeout for the job is 3600 seconds (60 minutes). This can be | ||
| # adjusted if your processing requires more time. | ||
| # - If no runtime template is provided, dbt will automatically create and reuse | ||
| # a default one for executing the Python code in BigQuery. | ||
| # | ||
| # BigFrames provides a pandas-like API for BigQuery data, enabling familiar | ||
| # data manipulation directly within your dbt project. This code sample | ||
| # illustrates a basic pattern for: | ||
| # 1. Reading data from an existing BigQuery dataset. | ||
| # 2. Processing it using pandas-like DataFrame operations powered by BigFrames. | ||
| # 3. Outputting a cleaned and transformed table, managed by dbt. | ||
|
|
||
|
|
||
| def model(dbt, session): | ||
| # Optional: Override settings from your dbt_project.yml file. | ||
| # When both are set, dbt.config takes precedence over dbt_project.yml. | ||
| # | ||
| # Use `dbt.config(submission_method="bigframes")` to tell dbt to execute | ||
| # this Python model using BigQuery DataFrames (BigFrames). This allows you | ||
| # to write pandas-like code that operates directly on BigQuery data | ||
| # without needing to pull all data into memory. | ||
| dbt.config(submission_method="bigframes") | ||
|
|
||
| # Define the BigQuery table path from which to read data. | ||
| table = "bigquery-public-data.epa_historical_air_quality.temperature_hourly_summary" | ||
|
|
||
| # Define the specific columns to select from the BigQuery table. | ||
| columns = ["state_name", "county_name", "date_local", "time_local", "sample_measurement"] | ||
|
|
||
| # Read data from the specified BigQuery table into a BigFrames DataFrame. | ||
| df = session.read_gbq(table, columns=columns) | ||
|
|
||
| # Sort the DataFrame by the specified columns. This prepares the data for | ||
| # `drop_duplicates` to ensure consistent duplicate removal. | ||
| df = df.sort_values(columns).drop_duplicates(columns) | ||
|
|
||
| # Group the DataFrame by 'state_name', 'county_name', and 'date_local'. For | ||
| # each group, calculate the minimum and maximum of the 'sample_measurement' | ||
| # column. The result will be a BigFrames DataFrame with a MultiIndex. | ||
| result = df.groupby(["state_name", "county_name", "date_local"])["sample_measurement"]\ | ||
| .agg(["min", "max"]) | ||
|
|
||
| # Rename some columns and convert the MultiIndex of the 'result' DataFrame | ||
| # into regular columns. This flattens the DataFrame so 'state_name', | ||
| # 'county_name', and 'date_local' become regular columns again. | ||
| result = result.rename(columns={'min': 'min_temperature', 'max': 'max_temperature'})\ | ||
| .reset_index() | ||
|
|
||
| # Return the processed BigFrames DataFrame. | ||
| # In a dbt Python model, this DataFrame will be materialized as a table | ||
| return result | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| # This example demonstrates how to build an **incremental dbt Python model** | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing license header. |
||
| # using BigFrames. | ||
| # | ||
| # Incremental models are essential for efficiently processing large datasets by | ||
| # only transforming new or changed data, rather than reprocessing the entire | ||
| # dataset every time. If the target table already exists, dbt will perform a | ||
| # merge based on the specified unique keys; otherwise, it will create a new | ||
| # table automatically. | ||
| # | ||
| # This model also showcases the definition and application of a **BigFrames | ||
| # User-Defined Function (UDF)** to add a descriptive summary column based on | ||
| # temperature data. BigFrames UDFs allow you to execute custom Python logic | ||
| # directly within BigQuery, leveraging BigQuery's scalability. | ||
|
|
||
|
|
||
| import bigframes.pandas as bpd | ||
|
|
||
| def model(dbt, session): | ||
| # Optional: override settings from dbt_project.yml. | ||
| # When both are set, dbt.config takes precedence over dbt_project.yml. | ||
| dbt.config( | ||
| # Use BigFrames mode to execute this Python model. This enables | ||
| # pandas-like operations directly on BigQuery data. | ||
| submission_method="bigframes", | ||
| # Materialize this model as an 'incremental' table. This tells dbt to | ||
| # only process new or updated data on subsequent runs. | ||
| materialized='incremental', | ||
| # Use MERGE strategy to update rows during incremental runs. | ||
| incremental_strategy='merge', | ||
| # Define the composite key that uniquely identifies a row in the | ||
| # target table. This key is used by the 'merge' strategy to match | ||
| # existing rows for updates during incremental runs. | ||
| unique_key=["state_name", "county_name", "date_local"], | ||
| ) | ||
|
|
||
| # Reference an upstream dbt model or an existing BigQuery table as a | ||
| # BigFrames DataFrame. It allows you to seamlessly use the output of another | ||
| # dbt model as input to this one. | ||
| df = dbt.ref("dbt_bigframes_code_sample_1") | ||
|
|
||
| # Define a BigFrames UDF to generate a temperature description. | ||
| # BigFrames UDFs allow you to define custom Python logic that executes | ||
| # directly within BigQuery. This is powerful for complex transformations. | ||
| @bpd.udf(dataset='dbt_sample_dataset', name='describe_udf') | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we be using |
||
| def describe( | ||
| max_temperature: float, | ||
| min_temperature: float, | ||
| ) -> str: | ||
| is_hot = max_temperature > 85.0 | ||
| is_cold = min_temperature < 50.0 | ||
|
|
||
| if is_hot and is_cold: | ||
| return "Expect both hot and cold conditions today." | ||
| if is_hot: | ||
| return "Overall, it's a hot day." | ||
| if is_cold: | ||
| return "Overall, it's a cold day." | ||
| return "Comfortable throughout the day." | ||
|
|
||
| # Apply the UDF using combine and store the result in a column "describe". | ||
| df["describe"] = df["max_temperature"].combine(df["min_temperature"], describe) | ||
|
|
||
| # Return the transformed BigFrames DataFrame. | ||
| # This DataFrame will be the final output of your incremental dbt model. | ||
| # On subsequent runs, only new or changed rows will be processed and merged | ||
| # into the target BigQuery table based on the `unique_key`. | ||
| return df | ||
Uh oh!
There was an error while loading. Please reload this page.