added error handling to facts and dimensions transformations

This commit is contained in:
Jake Pullen
2024-08-10 19:45:17 +01:00
parent 861ccd0dc4
commit 19b633eb40
2 changed files with 206 additions and 136 deletions
+61 -5
View File
@@ -21,10 +21,15 @@ class DimAccounts(Dimensions):
def transform(self): def transform(self):
# Read the parquet file into a polars DataFrame # Read the parquet file into a polars DataFrame
try:
accounts_df = pl.read_parquet(self.file_path) accounts_df = pl.read_parquet(self.file_path)
except Exception as e:
logging.error(f"Failed to read the base accounts parquet file: {e}")
return
# Transform the DataFrame # Transform the DataFrame
logging.info("Transforming the accounts DataFrame") logging.info("Transforming the accounts DataFrame")
try:
accounts_df = ( accounts_df = (
accounts_df accounts_df
.with_columns([ .with_columns([
@@ -51,10 +56,16 @@ class DimAccounts(Dimensions):
"debt_minimum_payments", "debt_escrow_amounts", "ingestion_date" "debt_minimum_payments", "debt_escrow_amounts", "ingestion_date"
]) ])
) )
except Exception as e:
logging.error(f"Failed to transform the accounts DataFrame: {e}")
return
# Write the DataFrame to a new parquet file # Write the DataFrame to a new parquet file
logging.info("Writing the transformed accounts DataFrame to parquet file") logging.info("Writing the transformed accounts DataFrame to parquet file")
try:
accounts_df.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet') accounts_df.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet')
except Exception as e:
logging.error(f"Failed to write the transformed accounts DataFrame to parquet file: {e}")
return
class DimCategories(Dimensions): class DimCategories(Dimensions):
def __init__(self, config): def __init__(self, config):
@@ -64,9 +75,13 @@ class DimCategories(Dimensions):
def transform(self): def transform(self):
# Read the parquet file into a polars DataFrame # Read the parquet file into a polars DataFrame
try:
categories_df = pl.read_parquet(self.file_path) categories_df = pl.read_parquet(self.file_path)
except Exception as e:
logging.error(f"Failed to read the base categories parquet file: {e}")
return
logging.info("Transforming the categories DataFrame") logging.info("Transforming the categories DataFrame")
# Select the required columns try:
categories_df = categories_df.select([ categories_df = categories_df.select([
'id', 'id',
'name', 'name',
@@ -78,6 +93,11 @@ class DimCategories(Dimensions):
'balance', 'balance',
'deleted' 'deleted'
]) ])
except Exception as e:
logging.error(f"Failed to select columns from the categories DataFrame: {e}")
return
try:
# Rename the columns # Rename the columns
categories_df = categories_df.with_columns(pl.col('id').alias('category_id')) categories_df = categories_df.with_columns(pl.col('id').alias('category_id'))
categories_df = categories_df.with_columns(pl.col('name').alias('category_name')) categories_df = categories_df.with_columns(pl.col('name').alias('category_name'))
@@ -89,10 +109,17 @@ class DimCategories(Dimensions):
categories_df = categories_df.with_columns(pl.col('balance') / 100) categories_df = categories_df.with_columns(pl.col('balance') / 100)
categories_df = categories_df.with_columns(pl.col('budgeted') / 100) categories_df = categories_df.with_columns(pl.col('budgeted') / 100)
categories_df = categories_df.with_columns(pl.col('activity') / 100) categories_df = categories_df.with_columns(pl.col('activity') / 100)
except Exception as e:
logging.error(f"Failed to transform the categories DataFrame: {e}")
return
# Write the DataFrame to a new parquet file # Write the DataFrame to a new parquet file
logging.info("Writing the transformed categories DataFrame to parquet file") logging.info("Writing the transformed categories DataFrame to parquet file")
try:
categories_df.write_parquet(self.config['warehouse_data_path'] + '/categories.parquet') categories_df.write_parquet(self.config['warehouse_data_path'] + '/categories.parquet')
except Exception as e:
logging.error(f"Failed to write the transformed categories DataFrame to parquet file: {e}")
return
class DimPayees(Dimensions): class DimPayees(Dimensions):
def __init__(self, config): def __init__(self, config):
@@ -102,22 +129,36 @@ class DimPayees(Dimensions):
def transform(self): def transform(self):
# Read the parquet file into a polars DataFrame # Read the parquet file into a polars DataFrame
try:
payees_df = pl.read_parquet(self.file_path) payees_df = pl.read_parquet(self.file_path)
except Exception as e:
logging.error(f"Failed to read the base payees parquet file: {e}")
return
logging.info("Transforming the payees DataFrame") logging.info("Transforming the payees DataFrame")
# Select the required columns try:
payees_df = payees_df.select([ payees_df = payees_df.select([
'id', 'id',
'name', 'name',
'deleted' 'deleted'
]) ])
except Exception as e:
logging.error(f"Failed to select columns from the payees DataFrame: {e}")
return
try:
# Rename the columns # Rename the columns
payees_df = payees_df.with_columns(pl.col('id').alias('payee_id')) payees_df = payees_df.with_columns(pl.col('id').alias('payee_id'))
payees_df = payees_df.with_columns(pl.col('name').alias('payee_name')) payees_df = payees_df.with_columns(pl.col('name').alias('payee_name'))
except Exception as e:
logging.error(f"Failed to rename columns in the payees DataFrame: {e}")
return
# Write the DataFrame to a new parquet file # Write the DataFrame to a new parquet file
logging.info("Writing the transformed payees DataFrame to parquet file") logging.info("Writing the transformed payees DataFrame to parquet file")
try:
payees_df.write_parquet(self.config['warehouse_data_path'] + '/payees.parquet') payees_df.write_parquet(self.config['warehouse_data_path'] + '/payees.parquet')
except Exception as e:
logging.error(f"Failed to write the transformed payees DataFrame to parquet file: {e}")
return
class DimDate(Dimensions): class DimDate(Dimensions):
def __init__(self, config): def __init__(self, config):
@@ -126,20 +167,35 @@ class DimDate(Dimensions):
def transform(self): def transform(self):
# Create a DataFrame with dates from 2020-01-01 to 2030-12-31 # Create a DataFrame with dates from 2020-01-01 to 2030-12-31
try:
dates_df = pl.DataFrame({'date':pl.date_range(date(2020, 1, 1), date(2030, 12, 31), "1d", eager=True)}) dates_df = pl.DataFrame({'date':pl.date_range(date(2020, 1, 1), date(2030, 12, 31), "1d", eager=True)})
except Exception as e:
logging.error(f"Failed to create a DataFrame with dates: {e}")
return
# Extract year, month, day, and weekday from the date column # Extract year, month, day, and weekday from the date column
try:
dates_df = dates_df.with_columns([ dates_df = dates_df.with_columns([
pl.col('date').dt.year().alias('year'), pl.col('date').dt.year().alias('year'),
pl.col('date').dt.month().alias('month'), pl.col('date').dt.month().alias('month'),
pl.col('date').dt.day().alias('day'), pl.col('date').dt.day().alias('day'),
pl.col('date').dt.weekday().alias('weekday') pl.col('date').dt.weekday().alias('weekday')
]) ])
except Exception as e:
logging.error(f"Failed to extract year, month, day, and weekday from the date column: {e}")
return
try:
# Create a new column to indicate if the date is a weekday or weekend # Create a new column to indicate if the date is a weekday or weekend
dates_df = dates_df.with_columns([ dates_df = dates_df.with_columns([
(pl.col('weekday') < 5).alias('is_weekday') # True for weekdays (Monday to Friday), False for weekends (Saturday and Sunday) (pl.col('weekday') < 5).alias('is_weekday') # True for weekdays (Monday to Friday), False for weekends (Saturday and Sunday)
]) ])
except Exception as e:
logging.error(f"Failed to create a new column to indicate if the date is a weekday or weekend: {e}")
return
# Write the DataFrame to a new parquet file # Write the DataFrame to a new parquet file
logging.info("Writing the transformed dates DataFrame to parquet file") logging.info("Writing the transformed dates DataFrame to parquet file")
try:
dates_df.write_parquet(self.config['warehouse_data_path'] + '/dates.parquet') dates_df.write_parquet(self.config['warehouse_data_path'] + '/dates.parquet')
except Exception as e:
logging.error(f"Failed to write the transformed dates DataFrame to parquet file: {e}")
return
+18 -4
View File
@@ -1,7 +1,6 @@
import polars as pl import polars as pl
import logging import logging
import os import os
from datetime import date
class Facts: class Facts:
def __init__(self, config): def __init__(self, config):
@@ -13,7 +12,6 @@ class Facts:
return f"{self.base_file_path}/{file_name}" return f"{self.base_file_path}/{file_name}"
class FactTransactions(Facts): class FactTransactions(Facts):
def __init__(self, config): def __init__(self, config):
super().__init__(config) super().__init__(config)
self.file_path = self.get_full_file_path('transactions.parquet') self.file_path = self.get_full_file_path('transactions.parquet')
@@ -21,10 +19,15 @@ class FactTransactions(Facts):
def transform(self): def transform(self):
# Read the parquet file into a polars DataFrame # Read the parquet file into a polars DataFrame
try:
transactions_df = pl.read_parquet(self.file_path) transactions_df = pl.read_parquet(self.file_path)
except FileNotFoundError:
logging.error("The transactions DataFrame does not exist")
return
# Transform the DataFrame # Transform the DataFrame
logging.info("Transforming the transactions DataFrame") logging.info("Transforming the transactions DataFrame")
try:
transactions_df = ( transactions_df = (
transactions_df transactions_df
.with_columns([ .with_columns([
@@ -51,13 +54,17 @@ class FactTransactions(Facts):
"debt_transaction_type","ingestion_date" "debt_transaction_type","ingestion_date"
]) ])
) )
except Exception as e:
logging.error(f"Failed to transform the transactions DataFrame: {e}")
return
# Write the DataFrame to a new parquet file # Write the DataFrame to a new parquet file
logging.info("Writing the transformed transactions DataFrame to parquet file") logging.info("Writing the transformed transactions DataFrame to parquet file")
try:
transactions_df.write_parquet(self.config['warehouse_data_path'] + '/transactions.parquet') transactions_df.write_parquet(self.config['warehouse_data_path'] + '/transactions.parquet')
except Exception as e:
logging.error(f"Failed to write the transformed transactions DataFrame: {e}")
class FactScheduledTransactions(Facts): class FactScheduledTransactions(Facts):
def __init__(self, config): def __init__(self, config):
super().__init__(config) super().__init__(config)
self.file_path = self.get_full_file_path('scheduled_transactions.parquet') self.file_path = self.get_full_file_path('scheduled_transactions.parquet')
@@ -73,6 +80,7 @@ class FactScheduledTransactions(Facts):
# Transform the DataFrame # Transform the DataFrame
logging.info("Transforming the scheduled transactions DataFrame") logging.info("Transforming the scheduled transactions DataFrame")
try:
scheduled_transactions_df = ( scheduled_transactions_df = (
scheduled_transactions_df scheduled_transactions_df
.with_columns([ .with_columns([
@@ -97,6 +105,12 @@ class FactScheduledTransactions(Facts):
"payee_name","category_name","ingestion_date" "payee_name","category_name","ingestion_date"
]) ])
) )
except Exception as e:
logging.error(f"Failed to transform the scheduled transactions DataFrame: {e}")
return
# Write the DataFrame to a new parquet file # Write the DataFrame to a new parquet file
logging.info("Writing the transformed scheduled transactions DataFrame to parquet file") logging.info("Writing the transformed scheduled transactions DataFrame to parquet file")
try:
scheduled_transactions_df.write_parquet(self.config['warehouse_data_path'] + '/scheduled_transactions.parquet') scheduled_transactions_df.write_parquet(self.config['warehouse_data_path'] + '/scheduled_transactions.parquet')
except Exception as e:
logging.error(f"Failed to write the transformed scheduled transactions DataFrame: {e}")