Friday, February 23, 2024

Module 4: Analytics Engineering (DTC DE Zoomcamp Week 4)

dbt (Data Build Tool) Overview

What is dbt?

dbt stands for data build tool. It's a transformation tool that allows us to transform process raw data in our Data Warehouse to transformed data which can be later used by Business Intelligence tools and any other data consumers.


dbt also allows us to introduce good software engineering practices by defining a deployment workflow:

 

1. Develop models

2. Test and document models

3. Deploy models with version control and CI/CD.

 

How does dbt work?

dbt works by defining a modeling layer that stands on top of our DataWarehouse. Each table is turned into a model and then transformed into a derived model, that can be stored into the DataWarehouse for persistence.

 

A model consists in:

 

• *.sql file

• Select statement, no DDL or DML are used

• File that dbt will compile and run in our DataWarehouse

 

How to use dbt?

dbt has 2 main components: dbt Core and dbt Cloud with the following characteristics:

 

dbt Cloud - SaaS application to develop and manage dbt projects

 

• Web-based IDE to develop, run and test a dbt project.

• Jobs orchestration.

• Logging and alerting.

• Intregrated documentation.

• Free for individuals (one developer seat).

 

dbt Core - open-source project that allows the data transformation

 

• Builds and runs a dbt project (.sql and .yaml files).

• Includes SQL compilation logic, macros and database adapters.

• Includes a CLI interface to run dbt commands locally.

• Open-source and free to use.

 

For this project, I use :

 

• dbt Cloud (dbt Cloud IDE) + GCP BigQuery

• dbt Core on Docker + BigQuery

 

Developing with dbt

Anatomy of a dbt model

dbt models are a combination of SQL (using SELECT statements) and Jinja templating language to define templates.

 

Below is an example of abt model:

 

{{

   config(materialized='table')

}}


 

SELECT *

FROM staging.source_table

WHERE record_state = 'ACTIVE'

 

·        In the Jinja statement defined within {{ }} block we call the config function. More information about Jinja and how to use it for dbt in this link.

·        The config function is commonly used at the beginning of a model to define a materialization strategy: a strategy for persisting dbt models in a data warehouse

·        There are 4 materialization strategies with the following characteristics:

·       View: When using the view materialization, the model is rebuilt as a view on each run, via a create view as statement

• Table: The model is rebuilt as a table on each run, via a create table as statement

·        Incremental: Allow dbt to insert or update records into a table since the last time that dbt as run Ephemeral: Are not directly build into the database. Instead, dbt will interpolate the code from this model into dependent models as a common table expression (CTE)

 

The FROM clause of a dbt model

The FROM clause within a SELECT statement defines the sources of the data to be used.

 

The following sources are available to dbt models:

 

Sources : The data loaded within our Data Warehouse.

 

• We can access this data with the source() function.

·        The sources key in ourYAML file contains the details of the databases that the source() function can access and translate into proper SQL-valid names.

·        Additionally, we can define "source freshness" to each source so that we can check whether a source is "fresh" or

"stale", which can be useful to check whether our data pipelines are working properly.

·        More info about sources in [this link}(https://docs.getdbt.com/docs/building-a-dbt-project/using-sources).

 

Seeds : CSV files which can be stored in our repo under the seeds folder.

 

• The repo gives us version controlling along with all of its benefits.

• Seeds are best suited to static data which changes infrequently.

• Seed usage:

◦ Add a CSV file to our seeds folder.

◦ Run the dbt seed command to create a table in our Data Warehouse.

 

If we update the content of a seed, running dbt seed will append the updated values to the table rather than substituing them. Running dbt seed --full-refresh instead will drop the old table and create a new one.

 

◦ Refer to the seed in our model with the ref() function.

 

◦ More info about seeds in this link.

Here's an example of how we would declare a source in a .yml file:

 

sources:

   - name: staging

     database: production

     schema: trips_data_all

 

     loaded_at_field: record_loaded_at

     tables:

       - name: green_tripdata

       - name: yellow_tripdata

         freshness:

           error_after: {count: 6, period: hour}

 

And here's how we would reference a source in a FROM clause:

 

FROM {{ source('staging','yellow_tripdata') }}

 

• The first argument of the source() function is the source name, and the second is the table name.


 

In the case of seeds, assuming we've got a taxi_zone_lookup.csv file in our seeds folder which contains locationid, borough, zone and service_zone:

 

SELECT

   locationid,

   borough,

   zone,

   replace(service_zone, 'Boro', 'Green') as service_zone

FROM {{ ref('taxi_zone_lookup) }}

 

The ref() function references underlying tables and views in the DataWarehouse. When compiled, it will automatically build the dependencies and resolve the correct schema fo us. So, if BigQuery contains a schema/dataset called dbt_dev inside the my_project database which we're using for development and it contains a table called stg_green_tripdata, then the following code...

 

WITH green_data AS (

   SELECT *,

       'Green' AS service_type

   FROM {{ ref('stg_green_tripdata') }}

),

 

...will compile to this:

 

WITH green_data AS (

   SELECT *,

       'Green' AS service_type

   FROM "my_project"."dbt_dev"."stg_green_tripdata"

),

 

·        The ref() function translates our references table into the full reference, using the database.schema.table structure.

·        If we were to run this code in our production environment, dbt would automatically resolve the reference to make ir point to our production schema.

 

Defining a source and creating a model

It's time to create our first model.

 

We will begin by creating 2 new folders under our models folder:

 

• staging will have the raw models.

• core will have the models that we will expose at the end to the BI tool, stakeholders, etc.

 

Under staging we will add 2 new files: sgt_green_tripdata.sql and schema.yml:

 

# schema.yml

 

version: 2

 

sources:

   - name: staging

     database: our_project

     schema: trips_data_all

 

     tables:

         - name: green_tripdata

         - name: yellow_tripdata

 

• We define our sources in the schema.yml model properties file.

• We are defining the 2 tables for yellow and green taxi data as our sources.

 

-- sgt_green_tripdata.sql

 

{{ config(materialized='view') }}

 

select * from {{ source('staging', 'green_tripdata') }} limit 100

 

• This query will create a view in the staging dataset/schema in our database.


 

• We make use of the source() function to access the green taxi data table, which is defined inside the schema.yml

file.

 

The advantage of having the properties in a separate file is that we can easily modify the schema.yml file to change the database details and write to different databases without having to modify our sgt_green_tripdata.sql file.

 

we may know run the model with the dbt run command, either locally or from dbt Cloud.

 

Macros

Macros are pieces of code in Jinja that can be reused, similar to functions in other languages.

 

dbt already includes a series of macros like config(), source() and ref(), but custom macros can also be defined.

 

Macros allow us to add features to SQL that aren't otherwise available, such as:

 

• Use control structures such as if statements or for loops.

• Use environment variables in our dbt project for production.

• Operate on the results of one query to generate another query.

• Abstract snippets of SQL into reusable macros.

 

Macros are defined in separate .sql files which are typically stored in a macros directory.

 

There are 3 kinds of Jinja delimiters:

 

• {% ... %} for statements (control blocks, macro definitions)

• {{ ... }} for expressions (literals, math, comparisons, logic, macro calls...)

• {# ... #} for comments.

Here's a macro definition example:

 

{# This macro returns the description of the payment_type #}

 

{% macro get_payment_type_description(payment_type) %}

 

   case {{ payment_type }}

       when 1 then 'Credit card'

       when 2 then 'Cash'

       when 3 then 'No charge'

       when 4 then 'Dispute'

       when 5 then 'Unknown'

       when 6 then 'Voided trip'

   end

 

{% endmacro %}

 

·        The macro keyword states that the line is a macro definition. It includes the name of the macro as well as the parameters.

·        The code of the macro itself goes between 2 statement delimiters. The second statement delimiter contains an endmacro keyword.

·        In the code, we can access the macro parameters using expression delimiters.

·        The macro returns the code we've defined rather than a specific value.

 

Here's how we use the macro:

 

select

   {{ get_payment_type_description('payment-type') }} as payment_type_description,   congestion_surcharge::double precision

from {{ source('staging','green_tripdata') }}

where vendorid is not null

 

• We pass a payment-type variable which may be an integer from 1 to 6.

 

And this is what it would compile to:


 

select

   case payment_type

       when 1 then 'Credit card'

       when 2 then 'Cash'

       when 3 then 'No charge'

       when 4 then 'Dispute'

       when 5 then 'Unknown'

       when 6 then 'Voided trip'

   end as payment_type_description,

   congestion_surcharge::double precision

from {{ source('staging','green_tripdata') }}

where vendorid is not null

 

 

The macro is replaced by the code contained within the macro definition as well as any variables that we may have passed to the macro parameters.

 

 

Packages

Macros can be exported to packages, similarly to how classes and functions can be exported to libraries in other languages. Packages contain standalone dbt projects with models and macros that tackle a specific problem area.

 

When we add a package to our project, the package's models and macros become part of our own project.A list of useful packages can be found in the dbt package hub.

 

To use a package, we must first create a packages.yml file in the root of our work directory. Here's an example:

 

packages:

 - package: dbt-labs/dbt_utils

   version: 0.8.0

 

After declaring our packages, we need to install them by running the dbt deps command either locally or on dbt Cloud.

 

we may access macros inside a package in a similar way to how Python access class methods:

 

select

   {{ dbt_utils.surrogate_key(['vendorid', 'lpep_pickup_datetime']) }} as tripid,   cast(vendorid as integer) as vendorid,

   -- ...

 

• The surrogate_key() macro generates a hashed surrogate key with the specified fields in the arguments.

 

Variables

Like most other programming languages, variables can be defined and used across our project.Variables can be defined in 2 different ways:

 

• Under the vars keyword inside dbt_project.yml.

 

vars:

   payment_type_values: [1, 2, 3, 4, 5, 6]

 

• As arguments when building or running our project.

 

dbt build --m --vars 'is_test_run: false'

 

Variables can be used with the var() macro. For example:

 

{% if var('is_test_run', default=true) %}

 

   limit 100

 

{% endif %}

 

·       In this example, the default value for is_test_run is true; in the absence of a variable definition either on the dbt_project.yml file or when running the project, then is_test_run would be true.

·       Since we passed the value false when runnning dbt build, then the if statement would evaluate to false and the code within would not run.


 

Setting up dbt Cloud IDE + GCP BigQuery

We will need to create a dbt cloud using this link and connect to our Data Warehouse following these instructions. More detailed instructions available in this guide

 

My dbt Cloud IDE + BigQuery setup and testing

 

Setting up dbt on Docker + GCP BigQuery

My dbt-docker-bigquery link

Tuesday, February 13, 2024

Workshop 1 : Data Ingestion (DTC DE Zoomcamp 2024

 Source

 https://github.com/DataTalksClub/data-engineering-zoomcamp/blob/main/cohorts/2024/workshops/dlt_resources/data_ingestion_workshop.md

dlt Setup

Here’s how you would do that on your local machine. I will walk you through before showing you in colab as well.

First, install dlt in new OS environment

Command prompt

cd /mnt/e/dlt
python -m venv ./env
source ./env/bin/activate
pip install dlt[duckdb]

source ./env/bin/activate

# for first, time install pandas, streamlist
pip install pandas
pip install streamlit

Data Loading

Create python script : taxi_data_loading.py

cd /mnt/e/dlt/scripts
vi taxi_data_loading.py
Edit taxi_data_loading.py as below :

import dlt
import duckdb

data = [
    {
        "vendor_name": "VTS",
        "record_hash": "b00361a396177a9cb410ff61f20015ad",
        "time": {
            "pickup": "2009-06-14 23:23:00",
            "dropoff": "2009-06-14 23:48:00"
        },
        "Trip_Distance": 17.52,
        "coordinates": {
            "start": {
                "lon": -73.787442,
                "lat": 40.641525
            },
            "end": {
                "lon": -73.980072,
                "lat": 40.742963
            }
        },
        "Rate_Code": None,
        "store_and_forward": None,
        "Payment": {
            "type": "Credit",
            "amt": 20.5,
            "surcharge": 0,
            "mta_tax": None,
            "tip": 9,
            "tolls": 4.15,
            "status": "booked"
        },
        "Passenger_Count": 2,
        "passengers": [
            {"name": "John", "rating": 4.9},
            {"name": "Jack", "rating": 3.9}
        ],
        "Stops": [
            {"lon": -73.6, "lat": 40.6},
            {"lon": -73.5, "lat": 40.5}
        ]
    },
]

# Define the connection to load to.
# We now use duckdb, but you can switch to Bigquery later
pipeline = dlt.pipeline(pipeline_name="taxi_data_loading", destination='duckdb', dataset_name='taxi_rides')

# Run the pipeline with default settings, and capture the outcome
info = pipeline.run(data, table_name="users", write_disposition="replace")

# Show the outcome
print(info)

Run script:

(env) root@Desktop-Gar:/mnt/e/dlt# python scripts/taxi_data_loading.py

Output:

OS prompt :

Pipeline taxi_data_loading load step completed in 2.29 seconds
1 load package(s) were loaded to destination duckdb and into dataset taxi_rides
The duckdb destination used duckdb:////mnt/e/dlt/taxi_data_loading.duckdb location to store data
Load package 1707886397.1636 is LOADED and contains no failed jobs

dlt pipeline taxi_data_loading show

Open other session

Browser http://localhost:8501/

Navigation –> Explore data

Incremental Loading

Create python script : taxi_incremental_loading.py

cd /mnt/e/dlt/scripts
vi taxi_incremental_loading.py
Edit taxi_incremental_loading.py as below :

Python prompt

import dlt
import duckdb

data = [
    {
        "vendor_name": "VTS",
        "record_hash": "b00361a396177a9cb410ff61f20015ad",
        "time": {
            "pickup": "2009-06-14 23:23:00",
            "dropoff": "2009-06-14 23:48:00"
        },
        "Trip_Distance": 17.52,
        "coordinates": {
            "start": {
                "lon": -73.787442,
                "lat": 40.641525
            },
            "end": {
                "lon": -73.980072,
                "lat": 40.742963
            }
        },
        "Rate_Code": None,
        "store_and_forward": None,
        "Payment": {
            "type": "Credit",
            "amt": 20.5,
            "surcharge": 0,
            "mta_tax": None,
            "tip": 9,
            "tolls": 4.15,
            "status": "cancelled"
        },
        "Passenger_Count": 2,
        "passengers": [
            {"name": "John", "rating": 4.4},
            {"name": "Jack", "rating": 3.6}
        ],
        "Stops": [
            {"lon": -73.6, "lat": 40.6},
            {"lon": -73.5, "lat": 40.5}
        ]
    },
]

pipeline = dlt.pipeline(pipeline_name='taxi_incremental_loading', destination='duckdb', dataset_name='taxi_rides')
info = pipeline.run(data, table_name="users", write_disposition="merge", primary_key="ID")

# show the outcome
print(info)

Run script:

(env) root@Desktop-Gar:/mnt/e/dlt# python scripts/taxi_incremental_loading.py

Output:

2024-02-14 12:11:50,609|[WARNING              ]|3864|140488174792704|dlt|reference.py|_verify_schema:357|A column id in table users in schema taxi_incremental_loading is incomplete. It was not bound to the data during normalizations stage and its data type is unknown. Did you add this column manually in code ie. as a merge key?
2024-02-14 12:11:51,046|[WARNING              ]|3864|140488174792704|dlt|reference.py|_verify_schema:357|A column id in table users in schema taxi_incremental_loading is incomplete. It was not bound to the data during normalizations stage and its data type is unknown. Did you add this column manually in code ie. as a merge key?
Pipeline taxi_incremental_loading load step completed in 2.74 seconds
1 load package(s) were loaded to destination duckdb and into dataset taxi_rides
The duckdb destination used duckdb:////mnt/e/dlt/taxi_incremental_loading.duckdb location to store data
Load package 1707887509.7438598 is LOADED and contains no failed jobs

dlt pipeline taxi_incremental_loading show

Found pipeline taxi_incremental_loading in /var/dlt/pipelines

You can now view your Streamlit app in your browser.

Local URL: http://localhost:8502

Network URL: http://172.25.243.204:8502

Navigation Sample

Homework

Solutions :

https://github.com/garjita63/de-zoomcamp-2024/blob/main/homewok/workshop1-data-ingestion.ipynb


Saturday, February 10, 2024

Module 3 : Data Warehouse (DTC DE Zoomcamp 2024)

Data Warehouse`

What is a Data Warehouse?


A Data Warehouse (DW) is an OLAP solution meant for reporting and data analysis. Unlike Data Lakes, which follow the Extract Load Transform (ELT) model, DWs commonly use the Extract Transform Load (ETL) model.

A DW receives data from different data sources which is then processed in a staging area before being ingested to the actual warehouse (a database) and arranged as needed. DWs may then feed data to separate Data Marts; smaller database systems which end users may use for different purposes.


BigQuery


BigQuery (BQ) is a Data Warehouse solution offered by Google Cloud Platform.

- BQ is serverless. There are no servers to manage or database software to install; this is managed by Google and it's transparent to the customers.
- BQ is scalable and has high availability. Google takes care of the underlying software and infrastructure.
- BQ has built-in features like Machine Learning, Geospatial Analysis and Business Intelligence among others.
- BQ maximizes flexibility by separating data analysis and storage in different compute engines, thus allowing the customers to budget accordingly and reduce costs.

Some alternatives to BigQuery from other cloud providers would be AWS Redshift or Azure Synapse Analytics.


Preparing Dataset Files


wget (source --> gcs folder)

Source: https://github.com/DataTalksClub/nyc-tlc-data/releases

Files : yellow_tripdata_2019-*.csv.gz & yellow_tripdata_2020-*.csv.gz

Activate Cloud Shell



# download dataset files

wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-01.csv.gz
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-02.csv.gz
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-03.csv.gz
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-04.csv.gz
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-05.csv.gz
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-06.csv.gz
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-07.csv.gz
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-08.csv.gz
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-09.csv.gz
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-10.csv.gz
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-11.csv.gz
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-012.csv.gz

wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-01.csv.gz
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-02.csv.gz
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-03.csv.gz
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-04.csv.gz
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-05.csv.gz
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-06.csv.gz
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-07.csv.gz
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-08.csv.gz
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-09.csv.gz
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-10.csv.gz
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-11.csv.gz
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-12.csv.gz


gsutil cp (gcs folder to gcs bucket)

$ gsutil -m cp yellow*.csv.gz gs://de-zoomcamp-garjita-bucket/trip_data



External tables


BigQuery supports a few external data sources: you may query these sources directly from BigQuery even though the data itself isn't stored in BQ.

An external table is a table that acts like a standard BQ table. The table metadata (such as the schema) is stored in BQ storage but the data itself is external.

You may create an external table from a CSV or Parquet file stored in a Cloud Storage bucket:

CREATE OR REPLACE EXTERNAL TABLE `nytaxi.external_yellow_tripdata`
OPTIONS (
  format = 'CSV',
  uris = ['gs://de-zoomcamp-garjita-bucket/trip_data/yellow_tripdata_2019-*.csv.gz', 'gs://de-zoomcamp-garjita-bucket/trip_data/yellow_tripdata_2020-*.csv.gz']
);




Regular Internal Table


You may import an external table into BQ as a regular internal table by copying the contents of the external table into a new internal table. For example:

CREATE OR REPLACE TABLE nytaxi.yellow_tripdata_non_partitoned AS
SELECT * FROM nytaxi.external_yellow_tripdata;



Partitions


When we create a dataset, we generally have one or more columns that are used as some type of filter (usually columns in where clause). In this case, we can partition a table based on such columns to improve BigQuery's performance. In this lesson, the instructor shows us an example of a dataset containing StackOverflow questions (left), and how the dataset would look like if it was partitioned by the Creation_date field (right).

Partitioning is a powerful feature of BigQuery. Suppose we want to query the questions created on a specific date. Partition improves processing, because BigQuery will not read or process any data from other dates. This improves efficiency and reduces querying costs.


BQ tables can be partitioned into multiple smaller tables. For example, if we often filter queries based on date, we could partition a table based on date so that we only query a specific sub-table based on the date we're interested in.

Partition tables are very useful to improve performance and reduce costs, because BQ will not process as much data per query.

You may partition a table by:

Time-unit column: tables are partitioned based on a TIMESTAMP, DATE, or DATETIME column in the table.
Ingestion time: tables are partitioned based on the timestamp when BigQuery ingests the data.
Integer range: tables are partitioned based on an integer column.

For Time-unit and Ingestion time columns, the partition may be daily (the default option), hourly, monthly or yearly.

Note: BigQuery limits the amount of partitions to 4000 per table. If you need more partitions, consider clustering as well.

Here's an example query for creating a partitioned table:

CREATE OR REPLACE TABLE nytaxi.yellow_tripdata_partitoned
PARTITION BY
  DATE(tpep_pickup_datetime) AS
SELECT * FROM nytaxi.external_yellow_tripdata;


 
BQ will identify partitioned tables with a specific icon. The Details tab of the table will specify the field which was used for partitioning the table and its datatype.

Querying a partitioned table is identical to querying a non-partitioned table, but the amount of processed data may be drastically different. Here are 2 identical queries to the non-partitioned and partitioned tables we created in the previous queries:

SELECT DISTINCT(VendorID)
FROM nytaxi.yellow_tripdata_non_partitoned
WHERE DATE(tpep_pickup_datetime) BETWEEN '2019-06-01' AND '2019-06-30';


- Query to non-partitioned table.
- It will process around 1.6GB of data/



SELECT DISTINCT(VendorID)
FROM nytaxi.yellow_tripdata_partitoned
WHERE DATE(tpep_pickup_datetime) BETWEEN '2019-06-01' AND '2019-06-30';


- Query to partitioned table.
- It will process around 106MB of data.

You may check the amount of rows of each partition in a partitioned table with a query such as this:

SELECT table_name, partition_id, total_rows
FROM `nytaxi.INFORMATION_SCHEMA.PARTITIONS`
WHERE table_name = 'yellow_tripdata_partitoned'
ORDER BY total_rows DESC;

This is useful to check if there are data imbalances and/or biases in your partitions.



Clustering


We can cluster tables based on some field. In the StackOverflow example presented by the instructor, after partitioning questions by date, we may want to cluster them by tag in each partition. Clustering also helps us to reduce our costs and improve query performance. The field that we choose for clustering depends on how the data will be queried.


Clustering consists of rearranging a table based on the values of its columns so that the table is ordered according to any criteria. Clustering can be done based on one or multiple columns up to 4; the order of the columns in which the clustering is specified is important in order to determine the column priority.

Clustering may improve performance and lower costs on big datasets for certain types of queries, such as queries that use filter clauses and queries that aggregate data.

Note: tables with less than 1GB don't show significant improvement with partitioning and clustering; doing so in a small table could even lead to increased cost due to the additional metadata reads and maintenance needed for these features.

Clustering columns must be top-level, non-repeated columns. The following datatypes are supported:
  • DATE
  • BOOL
  • GEOGRAPHY
  • INT64
  • NUMERIC
  • BIGNUMERIC
  • STRING
  • TIMESTAMP
  • DATETIME
A partitioned table can also be clustered. Here's an example query for creating a partitioned and clustered table:

CREATE OR REPLACE TABLE nytaxi.yellow_tripdata_partitoned_clustered
PARTITION BY DATE(tpep_pickup_datetime)
CLUSTER BY VendorID AS
SELECT * FROM nytaxi.external_yellow_tripdata;



Just like for partitioned tables, the Details tab for the table will also display the fields by which the table is clustered.



Here are 2 identical queries, one for a partitioned table and the other for a partitioned and clustered table:

SELECT count(*) as trips
FROM nytaxi.yellow_tripdata_partitoned
WHERE DATE(tpep_pickup_datetime) BETWEEN '2019-06-01' AND '2020-12-31'
  AND VendorID=1;

- Query to non-clustered, partitioned table.
- This will process about 1.1GB of data.


SELECT count(*) as trips
FROM nytaxi.yellow_tripdata_partitoned_clustered
WHERE DATE(tpep_pickup_datetime) BETWEEN '2019-06-01' AND '2020-12-31'
  AND VendorID=1;

- Query to partitioned and clustered data.
- This will process about 865MB of data.



Partitioning vs Clustering

As mentioned before, you may combine both partitioning and clustering in a table, but there are important differences between both techniques that you need to be aware of in order to decide what to use for your specific scenario:


You may choose clustering over partitioning when partitioning results in a small amount of data per partition, when partitioning would result in over 4000 partitions or if your mutation operations modify the majority of partitions in the table frequently (for example, writing to the table every few minutes and writing to most of the partitions each time rather than just a handful).

BigQuery has automatic reclustering: when new data is written to a table, it can be written to blocks that contain key ranges that overlap with the key ranges in previously written blocks, which weaken the sort property of the table. BQ will perform automatic reclustering in the background to restore the sort properties of the table.

- For partitioned tables, clustering is maintaned for data within the scope of each partition.


Best practices


Here's a list of best practices for BigQuery:

- Cost reduction
    - Avoid SELECT * . Reducing the amount of columns to display will drastically reduce the amount of processed data and lower costs.
    - Price your queries before running them.
    - Use clustered and/or partitioned tables if possible.
    - Use streaming inserts with caution. They can easily increase cost.
    - Materialize query results in different stages.
- Query performance
    - Filter on partitioned columns.
    - Denormalize data.
    - Use nested or repeated columns.
    - Use external data sources appropiately. Constantly reading data from a bucket may incur in additional costs and has worse performance.
    - Reduce data before using a JOIN.
    - Do not threat WITH clauses as prepared statements.
    - Avoid oversharding tables.
    - Avoid JavaScript user-defined functions.
    - Use approximate aggregation functions rather than complete ones such as HyperLogLog++.
    - Order statements should be the last part of the query.
    - Optimize join patterns.
    - Place the table with the largest number of rows first, followed by the table with the fewest rows, and then place the remaining tables by decreasing size.
        - This is due to how BigQuery works internally: the first table will be distributed evenly and the second table will be broadcasted to all the nodes. Check the Internals section for more details.


Machine Learning with BigQuery


Introduction to BigQuery ML


BigQuery ML is a BQ feature which allows us to create and execute Machine Learning models using standard SQL queries, without additional knowledge of Python nor any other programming languages and without the need to export data into a different system.

The pricing for BigQuery ML is slightly different and more complex than regular BigQuery. Some resources are free of charge up to a specific limit as part of the Google Cloud Free Tier. You may check the current pricing in this link.

BQ ML offers a variety of ML models depending on the use case, as the image below shows:


We will now create a few example queries to show how BQ ML works. Let's begin with creating a custom table:

CREATE OR REPLACE TABLE `nytaxi.yellow_tripdata_ml` (
  `passenger_count` INTEGER,
  `trip_distance` FLOAT64,
  `PULocationID` STRING,
  `DOLocationID` STRING,
  `payment_type` STRING,
  `fare_amount` FLOAT64,
  `tolls_amount` FLOAT64,
  `tip_amount` FLOAT64
) AS (
  SELECT passenger_count, trip_distance, CAST(PULocationID AS STRING), CAST(DOLocationID AS STRING), CAST(payment_type AS STRING), fare_amount, tolls_amount, tip_amount
  FROM nytaxi.yellow_tripdata_partitoned
  WHERE fare_amount != 0
);



- BQ supports feature preprocessing, both manual and automatic.
- A few columns such as PULocationID are categorical in nature but are represented with integer numbers in the original table. We cast them as strings in order to get BQ to automatically preprocess them as categorical features that will be one-hot encoded.
- Our target feature for the model will be tip_amount. We drop all records where tip_amount equals zero in order to improve training.

Let's now create a simple linear regression model with default settings:

CREATE OR REPLACE MODEL nytaxi.tip_model
OPTIONS (
  model_type='linear_reg',
  input_label_cols=['tip_amount'],
  DATA_SPLIT_METHOD='AUTO_SPLIT'
) AS
SELECT
  *
FROM
  nytaxi.yellow_tripdata_ml
WHERE
  tip_amount IS NOT NULL;



 


- The CREATE MODEL clause will create the nytaxi.tip_model model
- The OPTIONS() clause contains all of the necessary arguments to create our model/
    - model_type='linear_reg' is for specifying that we will create a linear regression model.
    - input_label_cols=['tip_amount'] lets BQ know that our target feature is tip_amount. For linear regression models, target features must be real numbers.
    - DATA_SPLIT_METHOD='AUTO_SPLIT' is for automatically splitting the dataset into train/test datasets.
- The SELECT statement indicates which features need to be considered for training the model.
    - Since we already created a dedicated table with all of the needed features, we simply select them all.
- Running this query may take several minutes.
- After the query runs successfully, the BQ explorer in the side panel will show all available models (just one in our case) with a special icon. Selecting a model will open a new tab with additional info such as model details, training graphs and evaluation metrics.

We can also get a description of the features with the following query:

SELECT * FROM ML.FEATURE_INFO(MODEL `nytaxi.tip_model`);

- The output will be similar to describe() in Pandas.

Model evaluation against a separate dataset is as follows:

SELECT
  *
FROM
ML.EVALUATE(
  MODEL `nytaxi.tip_model`, (
    SELECT
      *
    FROM
      `nytaxi.yellow_tripdata_ml`
    WHERE
      tip_amount IS NOT NULL
  )
);


- This will output similar metrics to those shown in the model info tab but with the updated values for the evaluation against the provided dataset.
- In this example we evaluate with the same dataset we used for training the model, so this is a silly example for illustration purposes.

The main purpose of a ML model is to make predictions. A ML.PREDICT statement is used for doing them:

SELECT
  *
FROM
ML.PREDICT(
  MODEL `nytaxi.tip_model`, (
    SELECT
      *
    FROM
      `nytaxi.yellow_tripdata_ml`
    WHERE
      tip_amount IS NOT NULL
  )
);


- The SELECT statement within ML.PREDICT provides the records for which we want to make predictions.
- Once again, we're using the same dataset we used for training to calculate predictions, so we already know the actual tips for the trips, but this is just an example.

Additionally, BQ ML has a special ML.EXPLAIN_PREDICT statement that will return the prediction along with the most important features that were involved in calculating the prediction for each of the records we want predicted.

SELECT
  *
FROM
ML.EXPLAIN_PREDICT(
  MODEL `nytaxi.tip_model`,(
    SELECT
      *
    FROM
      `nytaxi.yellow_tripdata_ml`
    WHERE
      tip_amount IS NOT NULL
  ), STRUCT(3 as top_k_features)
);



- This will return a similar output to the previous query but for each prediction, 3 additional rows will be provided with the most significant features along with the assigned weights for each feature.
  
Just like in regular ML models, BQ ML models can be improved with hyperparameter tuning. Here's an example query for tuning:

CREATE OR REPLACE MODEL `nytaxi.tip_hyperparam_model`
OPTIONS (
  model_type='linear_reg',
  input_label_cols=['tip_amount'],
  DATA_SPLIT_METHOD='AUTO_SPLIT',
  num_trials=5,
  max_parallel_trials=2,
  l1_reg=hparam_range(0, 20),
  l2_reg=hparam_candidates([0, 0.1, 1, 10])
) AS
SELECT
*
FROM
`nytaxi.yellow_tripdata_ml`
WHERE
tip_amount IS NOT NULL;



--> *Solved by creating a new project & dataset in new reqion. Here I use EU region.*

- We create a new model as normal but we add the num_trials option as an argument.
- All of the regular arguments used for creating a model are available for tuning. In this example we opt to tune the L1 and L2 regularizations.

All of the necessary reference documentation is available [in this link].(https://cloud.google.com/bigquery/docs/reference/libraries-overview)



BigQuery ML deployment


ML models created within BQ can be exported and deployed to Docker containers running TensorFlow Serving.

The following steps are based on this official tutorial. All of these commands are to be run from a terminal and the gcloud sdk must be installed.

1. Authenticate to your GCP project.

gcloud auth login

2. Export the model to a Cloud Storage bucket.

bq --project_id my-project-multi-region extract -m nytaxi_eu.tip_model gs://taxi_ml_model63/tip_model

3. Download the exported model files to a temporary directory.

mkdir /tmp/model
gsutil cp -r gs://taxi_ml_model63/tip_model /tmp/model

4. Create a version subdirectory

mkdir -p serving_dir/tip_model/1
cp -r /tmp/model/tip_model/* serving_dir/tip_model/1

# Optionally you may erase the temporary directoy
rm -r /tmp/model

5. Pull the TensorFlow Serving Docker image

docker pull tensorflow/serving

6.Run the Docker image. Mount the version subdirectory as a volume and provide a value for the MODEL_NAME environment variable.

# Make sure you don't mess up the spaces!
docker run \
  -p 8501:8501 \
  --mount type=bind,source=`pwd`/serving_dir/tip_model,target=/models/tip_model \
  -e MODEL_NAME=tip_model \
  -t tensorflow/serving &

7. With the image running, run a prediction with curl, providing values for the features used for the predictions.

curl \
  -d '{"instances": [{"passenger_count":1, "trip_distance":12.2, "PULocationID":"193", "DOLocationID":"264", "payment_type":"2","fare_amount":20.4,"tolls_amount":0.0}]}' \
  -X POST http://localhost:8501/v1/models/tip_model:predict


Output Sample :

{
    "predictions": [[0.2497064033370151]
    ]
}


MLOps Zoomcamp 2024 – Module 3

Module 3: Orchestration Source mlops-zoomcamp/03-orchestration at main · DataTalksClub/mlops-zoomcamp (github.com) Homework The goal of this...