Published by: garjita on February 5, 2024

Hands-On Learning

Source:

https://github.com/DataTalksClub/data-engineering-zoomcamp/tree/main/02-workflow-orchestration

Techs/Tool

Docker . Python . Postgres . Mage . GCP (IAM & Admin, Cloud Storage, BigQuery) . Terraform

Intro to Orchestration

image

Intro to Orchestration.pptx

Intro to Mage

image
  • Mage platform introduction
  • The fundamental concepts behind Mage
  • Get started
  • Spin Mage up via Docker
  • Run a simple pipeline

Intro to Mage.pptx

Mage and Postgres on Docker Desktop

ETL: API to Postgres

  • Build a simple ETL pipeline that loads data from an API into a Postgres database
  • Database will be built using Docker
  • It will be running locally, but it’s the same as if it were running in the cloud.

Resources

Taxi Dataset https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz

  • Building Pipeline

Pipeline Tree

Blocks List

load_taxi_data

import io
import pandas as pd
import requests
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test

@data_loader
def load_data_from_api(*args, **kwargs):
"""
Template for loading data from API
"""
url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz'

taxi_dtypes = {
'VendorID': pd.Int64Dtype(),
'pasangger_count': pd.Int64Dtype(),
'trip_distance': float,
'RateCodeID': pd.Int64Dtype(),
'store_and_fwd+flag': str,
'PULocationID': pd.Int64Dtype(),
'DOLocationID': pd.Int64Dtype(),
'payment_type': pd.Int64Dtype(),
'fare_mount': float,
'extra': float,
'mta_tax': float,
'tip_amount': float,
'tolls_amount': float,
'improvement_surcharge': float,
'total_amount': float,
'congestion_surcharge': float
}
parse_dates = ['tpep_pickup_datetime', 'tpep_dropoff_datetime']
return pd.read_csv(url, sep=',', compression='gzip', dtype=taxi_dtypes, parse_dates=parse_dates)

@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'

transform_taxi_data

if 'transformer' not in globals():

from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@transformer
def transform(data, args, *kwargs):
print("Rows with zero passengers:", data['passenger_count'].isin([0]).sum())
return data[data['passenger_count'] > 0]
@test
def test_output(output, *args):
assert output ['passenger_count'].isin([0]).sum() == 0, 'There are rides with zero passengers

taxi_data_to_pg

from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from pandas import DataFrame
from os import path
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data_to_postgres(df: DataFrame, **kwargs) -> None:
schema_name = 'ny_taxi' # Specify the name of the schema to export data to
table_name = 'yellow_cab_data' # Specify the name of the table to export data to
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'dev'
with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
loader.export(
df,
schema_name,
table_name,
index=False, # Specifies whether to include index in exported table
if_exists='replace', # Specify resolution policy if table name already exists
)

sql_taxi_data

1SELECT count(*) FROM ny_taxi.yellow_cab_data;
  • API to Postgres Pipeline Execution

ETL: API to GCS

In this tutorial will walk through the process of using Mage to extract, transform, and load data from an API to Google Cloud Storage (GCS). Covering both writing partitioned and unpartitioned data to GCS and discuss why you might want to do one over the other. Many data teams start with extracting data from a source and writing it to a data lake before loading it to a structured data source, like a database.

  • Building Pipeline

Pipeline Tree

Blok List

api_to_gcs

from mage_ai.settings.repo import get_repo_path

from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.google_cloud_storage import GoogleCloudStorage
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_from_google_cloud_storage(*args, **kwargs):
"""
Template for loading data from a Google Cloud Storage bucket.
Specify your configuration settings in 'io_config.yaml'.
Docs: https://docs.mage.ai/design/data-loading#googlecloudstorage
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
bucket_name = 'mage-zoomcamp-ketut-1'
object_key = 'yellow_tripdata_2021-01.csv'
return GoogleCloudStorage.with_config(ConfigFileLoader(config_path, config_profile)).load(
bucket_name,
object_key,
)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
  • API to GCS Pipeline Execution

ETL: GCS to BigQuery

Now that we’ve written data to GCS, let’s load it into BigQuery. In this section, we’ll walk through the process of using Mage to load our data from GCS to BigQuery. This closely mirrors a very common data engineering workflow: loading data from a data lake into a data warehouse.

Here I use another source file format (.parquet) that has been uploaded manually to GCS bucket.

yellow_tripdata_2023-11.parquet

  • Building pipeline

Pipeline Tree

Blocks List

extract_taxi_gcs

from mage_ai.settings.repo import get_repo_path

from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.google_cloud_storage import GoogleCloudStorage
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_from_google_cloud_storage(*args, **kwargs):
"""
Template for loading data from a Google Cloud Storage bucket.
Specify your configuration settings in 'io_config.yaml'.
Docs: https://docs.mage.ai/design/data-loading#googlecloudstorage
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
bucket_name = 'de-zoomcamp-garjita-bucket'
object_key = 'yellow_tripdata_2023-11.parquet'
return GoogleCloudStorage.with_config(ConfigFileLoader(config_path, config_profile)).load(
bucket_name,
object_key,
)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'

load_to_bigquery

from mage_ai.settings.repo import get_repo_path

from mage_ai.io.bigquery import BigQuery
from mage_ai.io.config import ConfigFileLoader
from pandas import DataFrame
from os import path
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data_to_big_query(df: DataFrame, **kwargs) -> None:
"""
Template for exporting data to a BigQuery warehouse.
Specify your configuration settings in 'io_config.yaml'.
Docs: https://docs.mage.ai/design/data-loading#bigquery
"""
table_id = 'dtc-de-course-2024-411803.taxidataset.yellow_tripdata_2023-11'
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
BigQuery.with_config(ConfigFileLoader(config_path, config_profile)).export(
df,
table_id,
if_exists='replace', # Specify resolution policy if table name already exists
)
  • GCS to BigQuery Pipeline Execution

Deploying to GCP with Terraform

After completing the installation and setup of Mage on GCP using Terraform, here are the results after running terraform apply .

module.lb-http.google_compute_backend_service.default["default"]: Creation complete after 59s [id=projects/dtc-de-course-2024-411803/global/backendServices/mage-data-gar-urlmap-backend-default]

module.lb-http.google_compute_url_map.default[0]: Creating...
module.lb-http.google_compute_url_map.default[0]: Still creating... [10s elapsed]
module.lb-http.google_compute_url_map.default[0]: Creation complete after 12s [id=projects/dtc-de-course-2024-411803/global/urlMaps/mage-data-gar-urlmap-url-map]
module.lb-http.google_compute_target_http_proxy.default[0]: Creating...
module.lb-http.google_compute_target_http_proxy.default[0]: Still creating... [10s elapsed]
module.lb-http.google_compute_target_http_proxy.default[0]: Creation complete after 12s [id=projects/dtc-de-course-2024-411803/global/targetHttpProxies/mage-data-gar-urlmap-http-proxy]
module.lb-http.google_compute_global_forwarding_rule.http[0]: Creating...
module.lb-http.google_compute_global_forwarding_rule.http[0]: Still creating... [10s elapsed]
module.lb-http.google_compute_global_forwarding_rule.http[0]: Still creating... [20s elapsed]
module.lb-http.google_compute_global_forwarding_rule.http[0]: Creation complete after 24s [id=projects/dtc-de-course-2024-411803/global/forwardingRules/mage-data-gar-urlmap]

Apply complete! Resources: 7 added, 0 changed, 1 destroyed.

Outputs:

service_ip = "34.49.166.36"

$ terraform destroy

Homework

Questions – Homework-02

Homework-02 Solution

  1. Pipeline Tree
  2. Blocks Script
  3. Execution
  4. GCS Bucket Files
  5. Pipeline Scheduling
  6. Answers

1. Pipeline Tree

hw02-pipeline-tree

2. Blocks Script

hw2-extract-taxi.py

image
import io

import pandas as pd
import requests
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_api(*args, **kwargs):
"""
Template for loading data from API
"""
taxi_dtypes = {
'VendorID': pd.Int64Dtype(),
'pasangger_count': pd.Int64Dtype(),
'trip_distance': float,
'RateCodeID': pd.Int64Dtype(),
'store_and_fwd+flag': str,
'PULocationID': pd.Int64Dtype(),
'DOLocationID': pd.Int64Dtype(),
'payment_type': pd.Int64Dtype(),
'fare_mount': float,
'extra': float,
'mta_tax': float,
'tip_amount': float,
'tolls_amount': float,
'improvement_surcharge': float,
'total_amount': float,
'congestion_surcharge': float
}
parse_dates = ["lpep_pickup_datetime", "lpep_dropoff_datetime"]
# Open the file for reading
url_file = open('taxi_url.txt', 'r')
# Get the first line of the file using the next() function
first_line = next(url_file)
# Read first line
df = pd.read_csv(first_line, sep=',', compression='gzip', dtype=taxi_dtypes, parse_dates=parse_dates)
# df = pd.read_csv(first_line, sep=',', compression='gzip', dtype=taxi_dtypes, parse_dates=True, keep_date_col=True)
# Open file and put to link
with open('taxi_url.txt', 'r') as text:
links = text.read().splitlines() # Read after first line
for url in links[1:]:
df1 = pd.read_csv(url, sep=',', compression='gzip', dtype=taxi_dtypes, parse_dates=parse_dates)
# df1 = pd.read_csv(url, sep=',', compression='gzip', dtype=taxi_dtypes, parse_dates=True, keep_date_col=True)
df = pd.concat([df, df1], sort=False)
# Return output return df

hw2_transform_taxi.py

if 'transformer' not in globals():

from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test


@transformer
def transform(data, *args, **kwargs):
""" Using Numpy
data = data[np.logical_not(data['passenger_count'].isin([0]))]
df = data[np.logical_not(data['trip_distance'].isin([0]))]
return df
"""
# Replace NaN value into 0 (zero) in passenger_count & trip_distance columns
data['passenger_count'] = data['passenger_count'].fillna(0)
data['trip_distance'] = data['trip_distance'].fillna(0)

# Remove rows tha have 0 (zero) values
data = data[data['passenger_count'] != 0]
data = data[data['trip_distance'] != 0]

# Create a new column lpep_pickup_date by converting lpep_pickup_datetime to a date
from datetime import date
data['lpep_pickup_date'] = data['lpep_pickup_datetime'].dt.date

# Rename column VendorID to vendor_id
data.columns = data.columns.str.replace("VendorID", "vendor_id")

# Return output
return data

hw2_export_taxi_postgres.py

image
from mage_ai.settings.repo import get_repo_path

from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from pandas import DataFrame
from os import path

if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def export_data_to_postgres(df: DataFrame, **kwargs) -> None:
"""
Template for exporting data to a PostgreSQL database.
Specify your configuration settings in 'io_config.yaml'.

Docs: https://docs.mage.ai/design/data-loading#postgresql
"""
# by default database_name = 'postgres' ==> io_config.yaml
schema_name = 'mage' # Specify the name of the schema to export data to
table_name = 'green_taxi' # Specify the name of the table to export data to
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'dev'

with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
loader.export(
df,
schema_name,
table_name,
index=False, # Specifies whether to include index in exported table
if_exists='replace', # Specify resolution policy if table name already exists
)

hw2_query_taxi.py

image
select count(*) from mage.green_taxi;

hw2_export_to_gcs_partition.py

image
from mage_ai.settings.repo import get_repo_path

from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.google_cloud_storage import GoogleCloudStorage
import os
import pandas as pd
from pandas import DataFrame
from os import path
import pyarrow as pa
import pyarrow.parquet as pq
import datetime as dt
from datetime import date

if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "/home/src/<my key>.json"

project_id = 'dtc-de-course-2024-411803'
bucket_name = 'de-zoomcamp-garjita-bucket'
table_name = "green_taxi"
root_path = f'{bucket_name}/{table_name}'

config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'

@data_exporter
def export_data(data, *args, **kwargs):
data['lpep_pickup_date'] = data['lpep_pickup_datetime'].dt.date

table = pa.Table.from_pandas(data)
gcs = pa.fs.GcsFileSystem()
pq.write_to_dataset(
table,
root_path=root_path,
partition_cols=['lpep_pickup_date'],
filesystem=gcs
)

3. Execution

hw2-extract-taxi.py

image

hw2_transform_taxi.py

image

hw2_export_taxi_postgres.py

image

hw2_query_taxi.py

image

hw2_export_to_gcs_partition.py

image

4. GCS Bucket Files


image

  • green_taxi table partitions list
  • $ gcloud storage ls
  1. gs://de-zoomcamp-garjita-bucket/green_taxi/
  2. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2009-01-01/
  3. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-09-30/
  4. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-01/
  5. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-02/
  6. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-03/
  7. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-04/
  8. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-05/
  9. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-06/
  10. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-07/
  11. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-08/
  12. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-09/
  13. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-10/
  14. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-11/
  15. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-12/
  16. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-13/
  17. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-14/
  18. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-15/
  19. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-16/
  20. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-17/
  21. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-18/
  22. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-19/
  23. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-20/
  24. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-21/
  25. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-22/
  26. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-23/
  27. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-24/
  28. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-25/
  29. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-26/
  30. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-27/
  31. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-28/
  32. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-29/
  33. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-30/
  34. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-31/
  35. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-01/
  36. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-02/
  37. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-03/
  38. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-04/
  39. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-05/
  40. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-06/
  41. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-07/
  42. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-08/
  43. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-09/
  44. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-10/
  45. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-11/
  46. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-12/
  47. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-13/
  48. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-14/
  49. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-15/
  50. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-16/
  51. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-17/
  52. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-18/
  53. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-19/
  54. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-20/
  55. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-21/
  56. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-22/
  57. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-23/
  58. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-24/
  59. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-25/
  60. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-26/
  61. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-27/
  62. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-28/
  63. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-29/
  64. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-30/
  65. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-01/
  66. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-02/
  67. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-03/
  68. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-04/
  69. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-05/
  70. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-06/
  71. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-07/
  72. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-08/
  73. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-09/
  74. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-10/
  75. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-11/
  76. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-12/
  77. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-13/
  78. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-14/
  79. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-15/
  80. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-16/
  81. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-17/
  82. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-18/
  83. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-19/
  84. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-20/
  85. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-21/
  86. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-22/
  87. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-23/
  88. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-24/
  89. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-25/
  90. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-26/
  91. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-27/
  92. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-28/
  93. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-29/
  94. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-30/
  95. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-31/
  96. gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2021-01-01/
  • Number of partitions folder image

5. Pipeline Scheduling

image
image
image

6. Answers

Question 1. Data Loading

Once the dataset is loaded, what’s the shape of the data?

Answer 1 : 266,855 rows x 20 columns

Note: Available on execution result of hw2-extract-taxi.py block

image

Question 2. Data Transformation

Upon filtering the dataset where the passenger count is greater than 0 and the trip distance is greater than zero, how many rows are left?

Answer 2 : 139,370 rows

Note: Available on execution result of hw2_transform_taxi.py block

image

Question 3. Data Transformation

Which of the following creates a new column lpep_pickup_date by converting lpep_pickup_datetime to a date?

Answer 3 : data[‘lpep_pickup_date’] = data[‘lpep_pickup_datetime’].dt.date

Note: Used on hw2_transform_taxi.py block

image

Question 4. Data Transformation

What are the existing values of VendorID in the dataset?

Answer 4 : 1 or 2

Note:

image

Question 5. Data Transformation

How many columns need to be renamed to snake case?

Answer 5 : 2

Note:

  1. lpep_pickup_datetime –> lpep_pickup_date
  2. VendorID –> vendor_id Question 6. Data Exporting Once exported, how many partitions (folders) are present in Google Cloud? Answer 6 : 96 Note: image

Leave a comment