Sunday, June 9, 2024

MLOps Zoomcamp 2024 – Module 3

Module 3: Orchestration

Source

mlops-zoomcamp/03-orchestration at main · DataTalksClub/mlops-zoomcamp (github.com)

Homework

The goal of this homework is to train a simple model for predicting the duration of a ride, but use Mage for it.

We'll use the same NYC taxi dataset , the Yellow taxi data for 2023.

Question 1. Run Mage

First, let's run Mage with Docker Compose. Follow the quick start guideline.

What's the version of Mage we run?

(You can see it in the UI)

Answer of Question 1: v0.9.71

Question 2. Creating a project

Now let's create a new project. We can call it "homework_03", for example.

How many lines are in the created metadata.yaml file?

  • 35
  • 45
  • 55
  • 65

Solution

  docker exec -it mlops-magic-platform-1 bash
  root@4c0edc9c9a86:/home/src# cd mlops
  root@4c0edc9c9a86:/home/src/mlops# mage init homework_03
  root@4c0edc9c9a86:/home/src/mlops# cd homework_03
  root@4c0edc9c9a86:/home/src/mlops/homework_03# wc -l metadata.yaml
  55 metadata.yaml

Answer of Question 2: 55

Question 3. Creating a pipeline

Let's create an ingestion code block.

In this block, we will read the March 2023 Yellow taxi trips data.

How many records did we load?

  • 3,003,766
  • 3,203,766
  • 3,403,766
  • 3,603,766

Solution

    import requests
    from io import BytesIO
    from typing import List

    import pandas as pd
    import numpy as np

    if 'data_loader' not in globals():
        from mage_ai.data_preparation.decorators import data_loader


    @data_loader
    def ingest_files(**kwargs) -> pd.DataFrame:
        dfs: List[pd.DataFrame] = []

    #for year, months in [(2023, (3))]:
    #    for i in range(*months):
    response = requests.get(
        'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-03.parquet'
    )

    df = pd.read_parquet(BytesIO(response.content))
    df['tpep_pickup_datetime_cleaned'] = df['tpep_pickup_datetime'].astype(np.int64) // 10**9
    dfs.append(df)

    return pd.concat(dfs)
image

Answer of Question 3: 3,403,766

Question 4. Data preparation

Let's use the same logic for preparing the data we used previously. We will need to create a transformer code block and put this code there.

This is what we used (adjusted for yellow dataset):

def ead_dataframe(filename):
    df = pd.read_parquet(filename)
    df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
    df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)

    df['duration'] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
    df.duration = df.duration.dt.total_seconds() / 60

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)

    return df

Let's adjust it and apply to the data we loaded in question 3.

What's the size of the result?

  • 2,903,766
  • 3,103,766
  • 3,316,216
  • 3,503,766

Solution

from typing import Tuple

import pandas as pd

from mlops.utils.data_preparation.yellow_cleaning import clean
from mlops.utils.data_preparation.feature_engineering import combine_features
from mlops.utils.data_preparation.feature_selector import select_features
from mlops.utils.data_preparation.splitters import split_on_value

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer

@transformer
def transform(
    df: pd.DataFrame, **kwargs
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
    df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)

    df['duration'] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
    df.duration = df.duration.dt.total_seconds() / 60

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)

    return df
image

Answer of Question 4: 3,316,216

Question 5. Train a model

We will now train a linear regression model using the same code as in homework 1

- Fit a dict vectorizer
- Train a linear regression with default parameres
- Use pick up and drop off locations separately, don't create a combination feature

Let's now use it in the pipeline. We will need to create another transformation block, and return both the dict vectorizer and the model

What's the intercept of the model?

Hint: print the intercept_ field in the code block

  • 21.77
  • 24.77
  • 27.77
  • 31.77

Solution

from typing import Tuple
import pandas as pd
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer

@transformer
def transform_train_model(
    df: pd.DataFrame, **kwargs
) -> Tuple[pd.DataFrame, DictVectorizer, LinearRegression]:
    print("Starting the transform function")

    # Compute the duration in minutes
    df['duration'] = (df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']).dt.total_seconds() / 60
    print("Duration computed")

    # Filter the records to keep only those with duration between 1 and 60 minutes (inclusive)
    df_filtered = df[(df['duration'] >= 1) & (df['duration'] <= 60)].copy()
    print(f"Data filtered: {df_filtered.shape[0]} records")

    # Cast IDs to string after ensuring they are of object type
    df_filtered['PULocationID'] = df_filtered['PULocationID'].astype('object').astype(str)
    df_filtered['DOLocationID'] = df_filtered['DOLocationID'].astype('object').astype(str)
    print("IDs casted to string")

    # Prepare feature list of dictionaries
    dicts = df_filtered[['PULocationID', 'DOLocationID']].to_dict(orient='records')
    print("Converted to list of dictionaries")

    # Fit a dictionary vectorizer
    dv = DictVectorizer()
    X_train = dv.fit_transform(dicts)
    print(f"Dictionary vectorizer fitted: {X_train.shape}")

    # Prepare the target variable
    y_train = df_filtered['duration'].values
    print(f"Target variable prepared: {y_train.shape}")

    # Train a linear regression model
    lr = LinearRegression()
    lr.fit(X_train, y_train)
    print("Linear regression model trained")

    # Print the intercept of the model
    print(f"Model intercept: {lr.intercept_}")

    df = df_filtered

    # Return the dictionary vectorizer and the model
    return df, dv, lr;
image

Answer of Question 5: 24.77

Pipeline of Answers 3, 4, 5

image

Question 6. Register the model

The model is trained, so let's save it with MLFlow.

If you run mage with docker-compose, stop it with Ctrl+C or

docker-compose down

Let's create a dockerfile for mlflow, e.g. mlflow.dockerfile:

FROM python:3.10-slim

RUN pip install mlflow==2.12.1

EXPOSE 5000

CMD [ \
    "mlflow", "server", \
    "--backend-store-uri", "sqlite:///home/mlflow/mlflow.db", \
    "--host", "0.0.0.0", \
    "--port", "5000" \
]

And add it to the docker-compose.yaml:

  mlflow:
    build:
      context: .
      dockerfile: mlflow.dockerfile
    ports:
      - "5000:5000"
    volumes:
      - "${PWD}/mlflow:/home/mlflow/"
    networks:
      - app-network

Note that app-network is the same network as for mage and postgre containers. If you use a different compose file, adjust it.

We should already have mlflow==2.12.1 in requirements.txt in the mage project we created for the module. If you're starting from scratch, add it to your requirements.

Next, start the compose again and create a data exporter block.

In the block, we

- Log the model (linear regression)
- Save and log the artifact (dict vectorizer)

If you used the suggested docker-compose snippet, mlflow should be accessible at http://mlflow:5000.

Find the logged model, and find MLModel file. What's the size of the model? (model_size_bytes field):

  • 14,534
  • 9,534
  • 4,534
  • 1,534

Note: typically we do two last steps in one code block

Answer of Question 6: 9,534

Monday, April 1, 2024

Data Engineering Zoomcamp 2024 - Project 1

 

DE Zoomcamp 2024 - Project1

This repository contains a brief description of my DE Zoomcamp 2024 Project 1

Problem statement

The Retailrocket has collected a large dataset of E-commerce i.e a file with behaviour data (events.csv), a file with item properties (item_properties.сsv) and a file, which describes category tree (category_tree.сsv). The data has been collected from a real-world ecommerce website. It is raw data, i.e. without any content transformations, however, all values are hashed due to confidential issues. The purpose of publishing is to motivate researches in the field of recommender systems with implicit feedback. The goal of this project is to create a streamlined and efficient process for ingesting and analyzing e-commerce on Cloud by implementing Data Engineering concepts.

About the Dataset

Retailrocket recommender system

The dataset consists of three context files i.e. :

  1. a file with behaviour data (events.csv)
  2. a file, which describes category tree (category_tree.сsv).
  3. a file with item properties (item_properties_part1.сsv & item_properties_part2.csv)

The data has been collected from a real-world ecommerce website. It is raw data, i.e. without any content transformations, however, all values are hashed due to confidential issues.

The behaviour data, i.e. events like clicks, add to carts, transactions, represent interactions that were collected over a period of 4.5 months. A visitor can make three types of events, namely view, addtocart or transaction.

Technologies / Tools

  • Containerisation : Docker
  • Cloud : GCP
  • Infrastructure as code (IaC) : Terraform
  • Workflow orchestration : Mage-ai
  • Data Warehouse : BigQuery
  • Batch processing : pyspark SQL
  • IDE : VS Code, Jupyter Notebook
  • Language : Python
  • Visualisation : Google Looker Studio

Project Architecture

The end-to-end data pipeline includes the below steps:

  • Kaggle dataset is downloaded into the Google VM.
  • The downloaded CSV files (raw) are then uploaded to a folder in Google Cloud bucket (parquet) as Data Like.
  • Next, the data will be stored in BigQuery with format and values same as the GCP bucket files.
  • Last new tables are created from those original tables by using Spark SQL with correct data types as well as partitioned by Month and Clustered for optimised performance. These tables would be Data Warehouse tables.
  • Spin up a dataproc clusters (master and worker) and execute the pyspark jobs for procusts analys purposes
  • Configure Google Looker Studio to power dashboards from BigQuery Data Warehouse tables

You can find the detailed Architecture on the diagram below:

image

Reproducing from Scratch

Setup GCP

  • Create GCP Account.
  • Setup New Project and write down your Project ID.
  • Configure Service Account to get access to the project and download auth-keys (.json). Change auth-keys name if required.
    Please provide the service account the permissions below (sorted by name):
  1. BigQuery Admin
  2. Cloud SQL Client
  3. Compute Admin
  4. Compute Engine Service Agent
  5. Compute Network Admin
  6. Compute Storage Admin
  7. Dataproc Service Agent
  8. Editor
  9. Logs Bucket Writer
  10. Owner
  11. Storage Admin
  12. Storage Object Admin
  • Enable the following options under the APIs and services section:
  1. Identity and Access Management (IAM) API
  2. IAM service account credentials API
  3. Cloud Dataproc API
  4. Compute Engine API (if you are going to use VM instance)

Terraform as Internet as Code (IaC) to build infrastructure

  • Download Terraform from here: https://www.terraform.io/downloads
  • Under terraform folder, create files main.tf (required) and variables.tf (optional) to store terraform variables.
  • main.td containt the following resources want to be applied:
  1. Google Provider Versions
  2. resource "google_service_account"
  3. resource "google_project_iam_member"
  4. resource "google_compute_firewall"
  5. resource "google_storage_bucket"
  6. resource "google_storage_bucket_iam_member"
  7. resource "google_bigquery_dataset"
  8. resource "google_dataproc_cluster" (cluster_config : master_config, worker_config, software_config : image_version = "2.2.10-debian12"
      optional_components   = ["DOCKER", "JUPYTER"])
  • terraform init or terraform init -upgrade: command initializes the directory, downloads, teh necesary plugins for the cinfigured provider, and prepares for use.
  • terraform plan : too see execution plan
  • terraform apply : to apply the changes

If you would like to remove your stack from the Cloud, use the terraform destroy command.

Reproducibility

  • Assign External IP Address After terraform apply complete succeesfully, assign External IP Address for Master and Workers instances using Console. From VM Instance (Compute Engine) --> SSH image image image image image
  • Setting up Mage-ai, PostgreSQL and pgAdmin through the Master VM Instance SSH. Copy repsistories.sh into VM. repsistories.sh is script for installing docker network and bring up docker containers of Mage-ai, postgresql and pgAdmin.
  #############Install Docker network#############
  #create a network most containers will use
  sudo docker network create dockernet >> /root/dockernet.log
  sudo docker network ls >> /root/dockernet.log

  #############Bring up docker containers############
  cat > /root/docker-compose.yml <<- "SCRIPT"

  version: '3'
  services:
    magic:
      image: mageai/mageai:latest
      command: mage start dezoomcamp
      container_name: dezoomcamp-mage
      build:
        context: .
        dockerfile: Dockerfile
      environment:
        USER_CODE_PATH: /home/src/dezoomcamp
        POSTGRES_DBNAME: dezoomcampdb
        POSTGRES_SCHEMA: public
        POSTGRES_USER: postgres
        POSTGRES_PASSWORD: postgres316
        POSTGRES_HOST: vm-ikg-dezoomcamp
        POSTGRES_PORT: 5432
      ports:
        - 6789:6789
      volumes:
        - .:/home/src/
        - /root/.google/credentials/key-ikg-dezoomcamp-2024.json
      restart: on-failure:5
    postgres:
      image: postgres:14
      restart: on-failure
      container_name: dezoomcamp-postgres
      environment:
        POSTGRES_DB: dezoomcampdb
        POSTGRES_USER: postgres
        POSTGRES_PASSWORD: postgres316
      ports:
        - 5432:5432
    pgadmin:
      image: dpage/pgadmin4
      container_name: dezoomcamp-pgadmin
      environment:
        - PGADMIN_DEFAULT_EMAIL=admin@admin.com
        - PGADMIN_DEFAULT_PASSWORD=root
      ports:
        - 8080:80

  SCRIPT

  sudo docker compose -f /root/docker-compose.yml up -d

chmod +x repositories.sh

sudo ./repositories.sh

==> Mage-ai, postgresql and pgAdmin would be installed and up running.

Check mage :

image

Check pgadmin :

image

Restart Juypyer Notebook

Stop :
```
sudo systemctl stop jupyter
```

Start by using port 8888 :
```
jupyter-notebook  --port=8888 --ip=0.0.0.0 --no-browser
```
Note: we use 0.0.0.0 just for demo purpose. Don't use this in production!

![image](https://github.com/garjita63/retailrocket-ecommerce-batch/assets/77673886/e78fc04d-9055-4aeb-ac27-5b877a99e1ec)
  • Increase memory size for cluster if required, and then restart Jupyter Notebook
  jupyter notebook --generate-config

Open /home//.jupyter/jupyter_notebook_config.py

Edit file and modify parameter: c.NotebookApp.max_buffer_size

image
  • Spark master and worker clusters Edit ~/.bashrc file and add lines below: export SPATH=$SPARK_HOME/bin:$SPARK/sbin:$PATH source ~/.bashrc which start-all.sh Start master and worker clusters start-all.sh Try run spark by using dataset on hdfs Copy dataset folder into /user/ hdfs dfs -mkdir /user/<some_folder> hdfs dfs -copyFromLocal ecommerce-dataset/ /user/s<some_folder> Login to Master Cluster web image Login to Worker Cluster web image

Mage-ai orchestration pipelines

Create two runtime variables: bucket_name and dataset. These variables used by all pipeline blocks.

image
image

All pipeline and its blocks available in mage-project1.tar files.

  • Put mage-project1.tar into VM. You must have file copy authority to the master VM.
  gcloud auth login
  gcloud config set project <project_name>
  --> allow to access of your google account
  gcloud compute scp mage-project1.tar <username>@<project_name>:/home/<username>
  • Open SSH on Master instance
  • Copy mage-project1.tar into mage container (in this project named: dezoomcamp-mage)
  docker cp mager-project1.tar <mage_container>:/home/src/<some_folder>   # in this project is "dezoomcamp'
  • Go to mage container
  docker exec -it <mage_container> bash
  • Extract (untar) mage-project.tar file
  cd /home/src/<some_folder>
  tar -xvf mage-project1.tar
  • Open Mage application website : http://:6789
image
image
image
image
image
image
image
image
image
image
image
image
image

BigQuery Tables

image

events preview

image

events_dwh preview

image

item_properties preview

image

item_properties_dwh preview

image

Dashboard

The 10 Sold Items

image

Percentage of Events

image

MLOps Zoomcamp 2024 – Module 3

Module 3: Orchestration Source mlops-zoomcamp/03-orchestration at main · DataTalksClub/mlops-zoomcamp (github.com) Homework The goal of this...