Published by: garjita on February 5, 2024
Hands-On Learning
Source:
https://github.com/DataTalksClub/data-engineering-zoomcamp/tree/main/02-workflow-orchestration
Techs/Tool
Docker . Python . Postgres . Mage . GCP (IAM & Admin, Cloud Storage, BigQuery) . Terraform
Intro to Orchestration
Intro to Mage
- Mage platform introduction
- The fundamental concepts behind Mage
- Get started
- Spin Mage up via Docker
- Run a simple pipeline
Mage and Postgres on Docker Desktop
ETL: API to Postgres
- Build a simple ETL pipeline that loads data from an API into a Postgres database
- Database will be built using Docker
- It will be running locally, but it’s the same as if it were running in the cloud.
Resources
Taxi Dataset https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz
- Building Pipeline
Pipeline Tree
Blocks List
load_taxi_data
import io
import pandas as pd
import requests
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_api(*args, **kwargs):
"""
Template for loading data from API
"""
url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz'
taxi_dtypes = {
'VendorID': pd.Int64Dtype(),
'pasangger_count': pd.Int64Dtype(),
'trip_distance': float,
'RateCodeID': pd.Int64Dtype(),
'store_and_fwd+flag': str,
'PULocationID': pd.Int64Dtype(),
'DOLocationID': pd.Int64Dtype(),
'payment_type': pd.Int64Dtype(),
'fare_mount': float,
'extra': float,
'mta_tax': float,
'tip_amount': float,
'tolls_amount': float,
'improvement_surcharge': float,
'total_amount': float,
'congestion_surcharge': float
}
parse_dates = ['tpep_pickup_datetime', 'tpep_dropoff_datetime']
return pd.read_csv(url, sep=',', compression='gzip', dtype=taxi_dtypes, parse_dates=parse_dates)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
transform_taxi_data
if 'transformer' not in globals():
from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@transformer
def transform(data, args, *kwargs):
print("Rows with zero passengers:", data['passenger_count'].isin([0]).sum())
return data[data['passenger_count'] > 0]
@test
def test_output(output, *args):
assert output ['passenger_count'].isin([0]).sum() == 0, 'There are rides with zero passengers
taxi_data_to_pg
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from pandas import DataFrame
from os import path
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data_to_postgres(df: DataFrame, **kwargs) -> None:
schema_name = 'ny_taxi' # Specify the name of the schema to export data to
table_name = 'yellow_cab_data' # Specify the name of the table to export data to
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'dev'
with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
loader.export(
df,
schema_name,
table_name,
index=False, # Specifies whether to include index in exported table
if_exists='replace', # Specify resolution policy if table name already exists
)
sql_taxi_data
1 | SELECT count(*) FROM ny_taxi.yellow_cab_data; |
- API to Postgres Pipeline Execution
ETL: API to GCS
In this tutorial will walk through the process of using Mage to extract, transform, and load data from an API to Google Cloud Storage (GCS). Covering both writing partitioned and unpartitioned data to GCS and discuss why you might want to do one over the other. Many data teams start with extracting data from a source and writing it to a data lake before loading it to a structured data source, like a database.
- Building Pipeline
Pipeline Tree
Blok List
api_to_gcs
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.google_cloud_storage import GoogleCloudStorage
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_from_google_cloud_storage(*args, **kwargs):
"""
Template for loading data from a Google Cloud Storage bucket.
Specify your configuration settings in 'io_config.yaml'.
Docs: https://docs.mage.ai/design/data-loading#googlecloudstorage
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
bucket_name = 'mage-zoomcamp-ketut-1'
object_key = 'yellow_tripdata_2021-01.csv'
return GoogleCloudStorage.with_config(ConfigFileLoader(config_path, config_profile)).load(
bucket_name,
object_key,
)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
- API to GCS Pipeline Execution
ETL: GCS to BigQuery
Now that we’ve written data to GCS, let’s load it into BigQuery. In this section, we’ll walk through the process of using Mage to load our data from GCS to BigQuery. This closely mirrors a very common data engineering workflow: loading data from a data lake into a data warehouse.
Here I use another source file format (.parquet) that has been uploaded manually to GCS bucket.
yellow_tripdata_2023-11.parquet
- Building pipeline
Pipeline Tree
Blocks List
extract_taxi_gcs
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.google_cloud_storage import GoogleCloudStorage
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_from_google_cloud_storage(*args, **kwargs):
"""
Template for loading data from a Google Cloud Storage bucket.
Specify your configuration settings in 'io_config.yaml'.
Docs: https://docs.mage.ai/design/data-loading#googlecloudstorage
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
bucket_name = 'de-zoomcamp-garjita-bucket'
object_key = 'yellow_tripdata_2023-11.parquet'
return GoogleCloudStorage.with_config(ConfigFileLoader(config_path, config_profile)).load(
bucket_name,
object_key,
)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
load_to_bigquery
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.bigquery import BigQuery
from mage_ai.io.config import ConfigFileLoader
from pandas import DataFrame
from os import path
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data_to_big_query(df: DataFrame, **kwargs) -> None:
"""
Template for exporting data to a BigQuery warehouse.
Specify your configuration settings in 'io_config.yaml'.
Docs: https://docs.mage.ai/design/data-loading#bigquery
"""
table_id = 'dtc-de-course-2024-411803.taxidataset.yellow_tripdata_2023-11'
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
BigQuery.with_config(ConfigFileLoader(config_path, config_profile)).export(
df,
table_id,
if_exists='replace', # Specify resolution policy if table name already exists
)
- GCS to BigQuery Pipeline Execution
Deploying to GCP with Terraform
After completing the installation and setup of Mage on GCP using Terraform, here are the results after running terraform apply .
module.lb-http.google_compute_backend_service.default["default"]: Creation complete after 59s [id=projects/dtc-de-course-2024-411803/global/backendServices/mage-data-gar-urlmap-backend-default]
module.lb-http.google_compute_url_map.default[0]: Creating...
module.lb-http.google_compute_url_map.default[0]: Still creating... [10s elapsed]
module.lb-http.google_compute_url_map.default[0]: Creation complete after 12s [id=projects/dtc-de-course-2024-411803/global/urlMaps/mage-data-gar-urlmap-url-map]
module.lb-http.google_compute_target_http_proxy.default[0]: Creating...
module.lb-http.google_compute_target_http_proxy.default[0]: Still creating... [10s elapsed]
module.lb-http.google_compute_target_http_proxy.default[0]: Creation complete after 12s [id=projects/dtc-de-course-2024-411803/global/targetHttpProxies/mage-data-gar-urlmap-http-proxy]
module.lb-http.google_compute_global_forwarding_rule.http[0]: Creating...
module.lb-http.google_compute_global_forwarding_rule.http[0]: Still creating... [10s elapsed]
module.lb-http.google_compute_global_forwarding_rule.http[0]: Still creating... [20s elapsed]
module.lb-http.google_compute_global_forwarding_rule.http[0]: Creation complete after 24s [id=projects/dtc-de-course-2024-411803/global/forwardingRules/mage-data-gar-urlmap]
Apply complete! Resources: 7 added, 0 changed, 1 destroyed.
Outputs:
service_ip = "34.49.166.36"
$ terraform destroy
Homework
Homework-02 Solution
- Pipeline Tree
- Blocks Script
- Execution
- GCS Bucket Files
- Pipeline Scheduling
- Answers
1. Pipeline Tree
2. Blocks Script
hw2-extract-taxi.py
import io
import pandas as pd
import requests
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_api(*args, **kwargs):
"""
Template for loading data from API
"""
taxi_dtypes = {
'VendorID': pd.Int64Dtype(),
'pasangger_count': pd.Int64Dtype(),
'trip_distance': float,
'RateCodeID': pd.Int64Dtype(),
'store_and_fwd+flag': str,
'PULocationID': pd.Int64Dtype(),
'DOLocationID': pd.Int64Dtype(),
'payment_type': pd.Int64Dtype(),
'fare_mount': float,
'extra': float,
'mta_tax': float,
'tip_amount': float,
'tolls_amount': float,
'improvement_surcharge': float,
'total_amount': float,
'congestion_surcharge': float
}
parse_dates = ["lpep_pickup_datetime", "lpep_dropoff_datetime"]
# Open the file for reading
url_file = open('taxi_url.txt', 'r')
# Get the first line of the file using the next() function
first_line = next(url_file)
# Read first line
df = pd.read_csv(first_line, sep=',', compression='gzip', dtype=taxi_dtypes, parse_dates=parse_dates)
# df = pd.read_csv(first_line, sep=',', compression='gzip', dtype=taxi_dtypes, parse_dates=True, keep_date_col=True)
# Open file and put to link
with open('taxi_url.txt', 'r') as text:
links = text.read().splitlines() # Read after first line
for url in links[1:]:
df1 = pd.read_csv(url, sep=',', compression='gzip', dtype=taxi_dtypes, parse_dates=parse_dates)
# df1 = pd.read_csv(url, sep=',', compression='gzip', dtype=taxi_dtypes, parse_dates=True, keep_date_col=True)
df = pd.concat([df, df1], sort=False)
# Return output return df
hw2_transform_taxi.py
if 'transformer' not in globals():
from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@transformer
def transform(data, *args, **kwargs):
""" Using Numpy
data = data[np.logical_not(data['passenger_count'].isin([0]))]
df = data[np.logical_not(data['trip_distance'].isin([0]))]
return df
"""
# Replace NaN value into 0 (zero) in passenger_count & trip_distance columns
data['passenger_count'] = data['passenger_count'].fillna(0)
data['trip_distance'] = data['trip_distance'].fillna(0)
# Remove rows tha have 0 (zero) values
data = data[data['passenger_count'] != 0]
data = data[data['trip_distance'] != 0]
# Create a new column lpep_pickup_date by converting lpep_pickup_datetime to a date
from datetime import date
data['lpep_pickup_date'] = data['lpep_pickup_datetime'].dt.date
# Rename column VendorID to vendor_id
data.columns = data.columns.str.replace("VendorID", "vendor_id")
# Return output
return data
hw2_export_taxi_postgres.py
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from pandas import DataFrame
from os import path
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data_to_postgres(df: DataFrame, **kwargs) -> None:
"""
Template for exporting data to a PostgreSQL database.
Specify your configuration settings in 'io_config.yaml'.
Docs: https://docs.mage.ai/design/data-loading#postgresql
"""
# by default database_name = 'postgres' ==> io_config.yaml
schema_name = 'mage' # Specify the name of the schema to export data to
table_name = 'green_taxi' # Specify the name of the table to export data to
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'dev'
with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
loader.export(
df,
schema_name,
table_name,
index=False, # Specifies whether to include index in exported table
if_exists='replace', # Specify resolution policy if table name already exists
)
hw2_query_taxi.py
select count(*) from mage.green_taxi;
hw2_export_to_gcs_partition.py
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.google_cloud_storage import GoogleCloudStorage
import os
import pandas as pd
from pandas import DataFrame
from os import path
import pyarrow as pa
import pyarrow.parquet as pq
import datetime as dt
from datetime import date
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "/home/src/<my key>.json"
project_id = 'dtc-de-course-2024-411803'
bucket_name = 'de-zoomcamp-garjita-bucket'
table_name = "green_taxi"
root_path = f'{bucket_name}/{table_name}'
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
@data_exporter
def export_data(data, *args, **kwargs):
data['lpep_pickup_date'] = data['lpep_pickup_datetime'].dt.date
table = pa.Table.from_pandas(data)
gcs = pa.fs.GcsFileSystem()
pq.write_to_dataset(
table,
root_path=root_path,
partition_cols=['lpep_pickup_date'],
filesystem=gcs
)
3. Execution
hw2-extract-taxi.py
hw2_transform_taxi.py
hw2_export_taxi_postgres.py
hw2_query_taxi.py
hw2_export_to_gcs_partition.py
4. GCS Bucket Files
–
- green_taxi table partitions list
- $ gcloud storage ls
- gs://de-zoomcamp-garjita-bucket/green_taxi/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2009-01-01/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-09-30/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-01/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-02/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-03/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-04/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-05/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-06/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-07/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-08/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-09/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-10/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-11/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-12/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-13/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-14/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-15/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-16/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-17/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-18/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-19/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-20/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-21/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-22/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-23/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-24/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-25/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-26/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-27/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-28/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-29/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-30/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-10-31/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-01/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-02/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-03/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-04/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-05/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-06/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-07/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-08/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-09/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-10/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-11/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-12/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-13/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-14/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-15/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-16/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-17/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-18/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-19/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-20/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-21/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-22/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-23/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-24/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-25/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-26/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-27/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-28/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-29/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-11-30/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-01/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-02/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-03/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-04/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-05/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-06/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-07/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-08/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-09/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-10/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-11/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-12/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-13/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-14/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-15/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-16/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-17/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-18/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-19/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-20/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-21/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-22/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-23/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-24/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-25/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-26/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-27/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-28/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-29/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-30/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2020-12-31/
- gs://de-zoomcamp-garjita-bucket/green_taxi/lpep_pickup_date=2021-01-01/
- Number of partitions folder
5. Pipeline Scheduling
6. Answers
Question 1. Data Loading
Once the dataset is loaded, what’s the shape of the data?
Answer 1 : 266,855 rows x 20 columns
Note: Available on execution result of hw2-extract-taxi.py block
Question 2. Data Transformation
Upon filtering the dataset where the passenger count is greater than 0 and the trip distance is greater than zero, how many rows are left?
Answer 2 : 139,370 rows
Note: Available on execution result of hw2_transform_taxi.py block
Question 3. Data Transformation
Which of the following creates a new column lpep_pickup_date by converting lpep_pickup_datetime to a date?
Answer 3 : data[‘lpep_pickup_date’] = data[‘lpep_pickup_datetime’].dt.date
Note: Used on hw2_transform_taxi.py block
Question 4. Data Transformation
What are the existing values of VendorID in the dataset?
Answer 4 : 1 or 2
Note:
Question 5. Data Transformation
How many columns need to be renamed to snake case?
Answer 5 : 2
Note:
- lpep_pickup_datetime –> lpep_pickup_date
- VendorID –> vendor_id Question 6. Data Exporting Once exported, how many partitions (folders) are present in Google Cloud? Answer 6 : 96 Note: