diff --git a/pipeline/dimensions.py b/pipeline/dimensions.py index 3fed4f7..af0080b 100644 --- a/pipeline/dimensions.py +++ b/pipeline/dimensions.py @@ -21,40 +21,51 @@ class DimAccounts(Dimensions): def transform(self): # Read the parquet file into a polars DataFrame - accounts_df = pl.read_parquet(self.file_path) + try: + accounts_df = pl.read_parquet(self.file_path) + except Exception as e: + logging.error(f"Failed to read the base accounts parquet file: {e}") + return # Transform the DataFrame logging.info("Transforming the accounts DataFrame") - accounts_df = ( - accounts_df - .with_columns([ - pl.col("id").alias("account_id"), - pl.col("name").alias("account_name"), - pl.col("type").alias("account_type"), - pl.col("on_budget").alias("on_budget"), - pl.col("closed").alias("closed"), - pl.col("note").alias("note"), - pl.col("balance").alias("balance"), - pl.col("cleared_balance").alias("cleared_balance"), - pl.col("uncleared_balance").alias("uncleared_balance"), - pl.col("deleted").alias("deleted"), - ]) - .with_columns([ - pl.col("note").fill_null("unknown"), - (pl.col("balance") / 100).alias("balance"), - (pl.col("cleared_balance") / 100).alias("cleared_balance"), - (pl.col("uncleared_balance") / 100).alias("uncleared_balance"), - ]) - .drop([ - "transfer_payee_id", "direct_import_linked", "direct_import_in_error", - "last_reconciled_at", "debt_original_balance", "debt_interest_rates", - "debt_minimum_payments", "debt_escrow_amounts", "ingestion_date" - ]) - ) + try: + accounts_df = ( + accounts_df + .with_columns([ + pl.col("id").alias("account_id"), + pl.col("name").alias("account_name"), + pl.col("type").alias("account_type"), + pl.col("on_budget").alias("on_budget"), + pl.col("closed").alias("closed"), + pl.col("note").alias("note"), + pl.col("balance").alias("balance"), + pl.col("cleared_balance").alias("cleared_balance"), + pl.col("uncleared_balance").alias("uncleared_balance"), + pl.col("deleted").alias("deleted"), + ]) + .with_columns([ + pl.col("note").fill_null("unknown"), + (pl.col("balance") / 100).alias("balance"), + (pl.col("cleared_balance") / 100).alias("cleared_balance"), + (pl.col("uncleared_balance") / 100).alias("uncleared_balance"), + ]) + .drop([ + "transfer_payee_id", "direct_import_linked", "direct_import_in_error", + "last_reconciled_at", "debt_original_balance", "debt_interest_rates", + "debt_minimum_payments", "debt_escrow_amounts", "ingestion_date" + ]) + ) + except Exception as e: + logging.error(f"Failed to transform the accounts DataFrame: {e}") + return # Write the DataFrame to a new parquet file logging.info("Writing the transformed accounts DataFrame to parquet file") - accounts_df.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet') - + try: + accounts_df.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet') + except Exception as e: + logging.error(f"Failed to write the transformed accounts DataFrame to parquet file: {e}") + return class DimCategories(Dimensions): def __init__(self, config): @@ -64,36 +75,52 @@ class DimCategories(Dimensions): def transform(self): # Read the parquet file into a polars DataFrame - categories_df = pl.read_parquet(self.file_path) + try: + categories_df = pl.read_parquet(self.file_path) + except Exception as e: + logging.error(f"Failed to read the base categories parquet file: {e}") + return logging.info("Transforming the categories DataFrame") - # Select the required columns - categories_df = categories_df.select([ - 'id', - 'name', - 'category_group_name', - 'hidden', - 'note', - 'budgeted', - 'activity', - 'balance', - 'deleted' - ]) - # Rename the columns - categories_df = categories_df.with_columns(pl.col('id').alias('category_id')) - categories_df = categories_df.with_columns(pl.col('name').alias('category_name')) + try: + categories_df = categories_df.select([ + 'id', + 'name', + 'category_group_name', + 'hidden', + 'note', + 'budgeted', + 'activity', + 'balance', + 'deleted' + ]) + except Exception as e: + logging.error(f"Failed to select columns from the categories DataFrame: {e}") + return + + try: + # Rename the columns + categories_df = categories_df.with_columns(pl.col('id').alias('category_id')) + categories_df = categories_df.with_columns(pl.col('name').alias('category_name')) - # Fill null values in the note column - categories_df = categories_df.with_columns(pl.col('note').fill_null('unknown')) + # Fill null values in the note column + categories_df = categories_df.with_columns(pl.col('note').fill_null('unknown')) - # Convert the balance, budgeted, and activity columns to decimal - categories_df = categories_df.with_columns(pl.col('balance') / 100) - categories_df = categories_df.with_columns(pl.col('budgeted') / 100) - categories_df = categories_df.with_columns(pl.col('activity') / 100) + # Convert the balance, budgeted, and activity columns to decimal + categories_df = categories_df.with_columns(pl.col('balance') / 100) + categories_df = categories_df.with_columns(pl.col('budgeted') / 100) + categories_df = categories_df.with_columns(pl.col('activity') / 100) + except Exception as e: + logging.error(f"Failed to transform the categories DataFrame: {e}") + return # Write the DataFrame to a new parquet file logging.info("Writing the transformed categories DataFrame to parquet file") - categories_df.write_parquet(self.config['warehouse_data_path'] + '/categories.parquet') - + try: + categories_df.write_parquet(self.config['warehouse_data_path'] + '/categories.parquet') + except Exception as e: + logging.error(f"Failed to write the transformed categories DataFrame to parquet file: {e}") + return + class DimPayees(Dimensions): def __init__(self, config): super().__init__(config) @@ -102,22 +129,36 @@ class DimPayees(Dimensions): def transform(self): # Read the parquet file into a polars DataFrame - payees_df = pl.read_parquet(self.file_path) + try: + payees_df = pl.read_parquet(self.file_path) + except Exception as e: + logging.error(f"Failed to read the base payees parquet file: {e}") + return logging.info("Transforming the payees DataFrame") - # Select the required columns - payees_df = payees_df.select([ - 'id', - 'name', - 'deleted' - ]) - # Rename the columns - payees_df = payees_df.with_columns(pl.col('id').alias('payee_id')) - payees_df = payees_df.with_columns(pl.col('name').alias('payee_name')) + try: + payees_df = payees_df.select([ + 'id', + 'name', + 'deleted' + ]) + except Exception as e: + logging.error(f"Failed to select columns from the payees DataFrame: {e}") + return + try: + # Rename the columns + payees_df = payees_df.with_columns(pl.col('id').alias('payee_id')) + payees_df = payees_df.with_columns(pl.col('name').alias('payee_name')) + except Exception as e: + logging.error(f"Failed to rename columns in the payees DataFrame: {e}") + return # Write the DataFrame to a new parquet file logging.info("Writing the transformed payees DataFrame to parquet file") - payees_df.write_parquet(self.config['warehouse_data_path'] + '/payees.parquet') - + try: + payees_df.write_parquet(self.config['warehouse_data_path'] + '/payees.parquet') + except Exception as e: + logging.error(f"Failed to write the transformed payees DataFrame to parquet file: {e}") + return class DimDate(Dimensions): def __init__(self, config): @@ -126,20 +167,35 @@ class DimDate(Dimensions): def transform(self): # Create a DataFrame with dates from 2020-01-01 to 2030-12-31 - dates_df = pl.DataFrame({'date':pl.date_range(date(2020, 1, 1), date(2030, 12, 31), "1d", eager=True)}) - + try: + dates_df = pl.DataFrame({'date':pl.date_range(date(2020, 1, 1), date(2030, 12, 31), "1d", eager=True)}) + except Exception as e: + logging.error(f"Failed to create a DataFrame with dates: {e}") + return # Extract year, month, day, and weekday from the date column - dates_df = dates_df.with_columns([ - pl.col('date').dt.year().alias('year'), - pl.col('date').dt.month().alias('month'), - pl.col('date').dt.day().alias('day'), - pl.col('date').dt.weekday().alias('weekday') - ]) - # Create a new column to indicate if the date is a weekday or weekend - dates_df = dates_df.with_columns([ - (pl.col('weekday') < 5).alias('is_weekday') # True for weekdays (Monday to Friday), False for weekends (Saturday and Sunday) - ]) + try: + dates_df = dates_df.with_columns([ + pl.col('date').dt.year().alias('year'), + pl.col('date').dt.month().alias('month'), + pl.col('date').dt.day().alias('day'), + pl.col('date').dt.weekday().alias('weekday') + ]) + except Exception as e: + logging.error(f"Failed to extract year, month, day, and weekday from the date column: {e}") + return + try: + # Create a new column to indicate if the date is a weekday or weekend + dates_df = dates_df.with_columns([ + (pl.col('weekday') < 5).alias('is_weekday') # True for weekdays (Monday to Friday), False for weekends (Saturday and Sunday) + ]) + except Exception as e: + logging.error(f"Failed to create a new column to indicate if the date is a weekday or weekend: {e}") + return # Write the DataFrame to a new parquet file logging.info("Writing the transformed dates DataFrame to parquet file") - dates_df.write_parquet(self.config['warehouse_data_path'] + '/dates.parquet') + try: + dates_df.write_parquet(self.config['warehouse_data_path'] + '/dates.parquet') + except Exception as e: + logging.error(f"Failed to write the transformed dates DataFrame to parquet file: {e}") + return diff --git a/pipeline/facts.py b/pipeline/facts.py index 8858f8e..7611826 100644 --- a/pipeline/facts.py +++ b/pipeline/facts.py @@ -1,7 +1,6 @@ import polars as pl import logging import os -from datetime import date class Facts: def __init__(self, config): @@ -13,7 +12,6 @@ class Facts: return f"{self.base_file_path}/{file_name}" class FactTransactions(Facts): - def __init__(self, config): super().__init__(config) self.file_path = self.get_full_file_path('transactions.parquet') @@ -21,43 +19,52 @@ class FactTransactions(Facts): def transform(self): # Read the parquet file into a polars DataFrame - transactions_df = pl.read_parquet(self.file_path) - + try: + transactions_df = pl.read_parquet(self.file_path) + except FileNotFoundError: + logging.error("The transactions DataFrame does not exist") + return + # Transform the DataFrame logging.info("Transforming the transactions DataFrame") - transactions_df = ( - transactions_df - .with_columns([ - pl.col("id").alias("transaction_id"), - pl.col("date").alias("transaction_date"), - pl.col("amount").alias("transaction_amount"), - pl.col("memo").alias("transaction_memo"), - pl.col("cleared").alias("transaction_cleared"), - pl.col("approved").alias("transaction_approved"), - pl.col("flag_color").alias("transaction_flag_color"), - pl.col("account_id").alias("account_id"), - pl.col("payee_id").alias("payee_id"), - pl.col("category_id").alias("category_id"), - pl.col("transfer_account_id").alias("transfer_account_id"), - ]) - .with_columns([ - pl.col("memo").fill_null("unknown"), - (pl.col("amount") / 100).alias("transaction_amount"), - ]) - .drop([ - "transfer_transaction_id", "matched_transaction_id", "import_id", - "subtransactions", "deleted","flag_name","account_name", - "payee_name","category_name","import_payee_name","import_payee_name_original", - "debt_transaction_type","ingestion_date" - ]) - ) - + try: + transactions_df = ( + transactions_df + .with_columns([ + pl.col("id").alias("transaction_id"), + pl.col("date").alias("transaction_date"), + pl.col("amount").alias("transaction_amount"), + pl.col("memo").alias("transaction_memo"), + pl.col("cleared").alias("transaction_cleared"), + pl.col("approved").alias("transaction_approved"), + pl.col("flag_color").alias("transaction_flag_color"), + pl.col("account_id").alias("account_id"), + pl.col("payee_id").alias("payee_id"), + pl.col("category_id").alias("category_id"), + pl.col("transfer_account_id").alias("transfer_account_id"), + ]) + .with_columns([ + pl.col("memo").fill_null("unknown"), + (pl.col("amount") / 100).alias("transaction_amount"), + ]) + .drop([ + "transfer_transaction_id", "matched_transaction_id", "import_id", + "subtransactions", "deleted","flag_name","account_name", + "payee_name","category_name","import_payee_name","import_payee_name_original", + "debt_transaction_type","ingestion_date" + ]) + ) + except Exception as e: + logging.error(f"Failed to transform the transactions DataFrame: {e}") + return # Write the DataFrame to a new parquet file logging.info("Writing the transformed transactions DataFrame to parquet file") - transactions_df.write_parquet(self.config['warehouse_data_path'] + '/transactions.parquet') + try: + transactions_df.write_parquet(self.config['warehouse_data_path'] + '/transactions.parquet') + except Exception as e: + logging.error(f"Failed to write the transformed transactions DataFrame: {e}") class FactScheduledTransactions(Facts): - def __init__(self, config): super().__init__(config) self.file_path = self.get_full_file_path('scheduled_transactions.parquet') @@ -73,30 +80,37 @@ class FactScheduledTransactions(Facts): # Transform the DataFrame logging.info("Transforming the scheduled transactions DataFrame") - scheduled_transactions_df = ( - scheduled_transactions_df - .with_columns([ - pl.col("id").alias("scheduled_transaction_id"), - pl.col("date_first").alias("scheduled_transaction_first_date"), - pl.col("date_next").alias("scheduled_transaction_next_date"), - pl.col("frequency").alias("scheduled_transaction_frequency"), - pl.col("amount").alias("scheduled_transaction_amount"), - pl.col("memo").alias("scheduled_transaction_memo"), - pl.col("flag_color").alias("scheduled_transaction_flag_color"), - pl.col("account_id").alias("account_id"), - pl.col("payee_id").alias("payee_id"), - pl.col("category_id").alias("category_id"), - pl.col("transfer_account_id").alias("transfer_account_id"), - ]) - .with_columns([ - pl.col("memo").fill_null("unknown"), - (pl.col("amount") / 100).alias("scheduled_transaction_amount"), - ]) - .drop([ - "subtransactions", "deleted","flag_name","account_name", - "payee_name","category_name","ingestion_date" - ]) - ) + try: + scheduled_transactions_df = ( + scheduled_transactions_df + .with_columns([ + pl.col("id").alias("scheduled_transaction_id"), + pl.col("date_first").alias("scheduled_transaction_first_date"), + pl.col("date_next").alias("scheduled_transaction_next_date"), + pl.col("frequency").alias("scheduled_transaction_frequency"), + pl.col("amount").alias("scheduled_transaction_amount"), + pl.col("memo").alias("scheduled_transaction_memo"), + pl.col("flag_color").alias("scheduled_transaction_flag_color"), + pl.col("account_id").alias("account_id"), + pl.col("payee_id").alias("payee_id"), + pl.col("category_id").alias("category_id"), + pl.col("transfer_account_id").alias("transfer_account_id"), + ]) + .with_columns([ + pl.col("memo").fill_null("unknown"), + (pl.col("amount") / 100).alias("scheduled_transaction_amount"), + ]) + .drop([ + "subtransactions", "deleted","flag_name","account_name", + "payee_name","category_name","ingestion_date" + ]) + ) + except Exception as e: + logging.error(f"Failed to transform the scheduled transactions DataFrame: {e}") + return # Write the DataFrame to a new parquet file logging.info("Writing the transformed scheduled transactions DataFrame to parquet file") - scheduled_transactions_df.write_parquet(self.config['warehouse_data_path'] + '/scheduled_transactions.parquet') + try: + scheduled_transactions_df.write_parquet(self.config['warehouse_data_path'] + '/scheduled_transactions.parquet') + except Exception as e: + logging.error(f"Failed to write the transformed scheduled transactions DataFrame: {e}")