diff --git a/docs/dataflow.md b/docs/dataflow.md index 37f0fb7..1f2bcfc 100644 --- a/docs/dataflow.md +++ b/docs/dataflow.md @@ -28,3 +28,7 @@ The Data Warehouse is the data after it has been aggregated and transformed. It ## Processed Archive The Processed Archive is the data after it has been processed and stored in the base tables. It is the raw json files in the `data/processed/` directory with a folder for each entity and file for each load that has been processed. + +## Visualisation datasets + +When preparing the data for visualisation, we create dataframes in memory that are used to create the visualisations. These are not stored on disk. diff --git a/main.py b/main.py index 834983c..e06bcec 100644 --- a/main.py +++ b/main.py @@ -8,7 +8,7 @@ import logging.config import logging.handlers import config.exit_codes as ec -from dash_app import app +#from dash_app import app from pipeline.pipeline_main import pipeline_main def set_up_logging(): @@ -58,7 +58,7 @@ config['BUDGET_ID'] = BUDGET_ID if __name__ == '__main__': try: pipeline_main(config) - app.run() #debug=True) + # app.run() #debug=True) except SystemExit as e: exit_code = e.code if exit_code == ec.SUCCESS: diff --git a/pipeline/dimensions.py b/pipeline/dimensions.py index 2bc51e7..ee7f9e4 100644 --- a/pipeline/dimensions.py +++ b/pipeline/dimensions.py @@ -22,47 +22,55 @@ class DimAccounts(Dimensions): def transform(self): # Read the parquet file into a polars DataFrame try: - accounts_df = pl.read_parquet(self.file_path) + source_accounts = pl.read_parquet(self.file_path) except Exception as e: logging.error(f"Failed to read the base accounts parquet file: {e}") return - # Transform the DataFrame logging.info("Transforming the accounts DataFrame") try: - accounts_df = ( - accounts_df - .with_columns([ - pl.col("id").alias("account_id"), - pl.col("name").alias("account_name"), - pl.col("type").alias("account_type"), - pl.col("on_budget").alias("on_budget"), - pl.col("closed").alias("closed"), - pl.col("note").alias("note"), - pl.col("balance").alias("balance"), - pl.col("cleared_balance").alias("cleared_balance"), - pl.col("uncleared_balance").alias("uncleared_balance"), - pl.col("deleted").alias("deleted"), - ]) - .with_columns([ - pl.col("note").fill_null("unknown"), - (pl.col("balance") / 100).alias("balance"), - (pl.col("cleared_balance") / 100).alias("cleared_balance"), - (pl.col("uncleared_balance") / 100).alias("uncleared_balance"), - ]) - .drop([ - "transfer_payee_id", "direct_import_linked", "direct_import_in_error", - "last_reconciled_at", "debt_original_balance", "debt_interest_rates", - "debt_minimum_payments", "debt_escrow_amounts", "ingestion_date" + base_accounts = ( + source_accounts.select([ + "id", + "name", + "type", + "on_budget", + "closed", + "note", + "balance", + "cleared_balance", + "uncleared_balance", + "deleted" ]) ) + except Exception as e: + logging.error(f"Failed to select columns from the categories DataFrame: {e}") + return + + try: + add_accounts_prefix = base_accounts.with_columns([ + pl.col("id").alias("account_id"), + pl.col("name").alias("account_name"), + pl.col("type").alias("account_type") + ]) + fill_accounts_null_values = add_accounts_prefix.with_columns([ + pl.col('note').fill_null('none') + ]) + fix_accounts_values = fill_accounts_null_values.with_columns([ + (pl.col("balance") / 1000).alias("balance"), + (pl.col("cleared_balance") / 1000).alias("cleared_balance"), + (pl.col("uncleared_balance") / 1000).alias("uncleared_balance"), + ]) + drop_accounts_columns = fix_accounts_values.drop([ + "id", "name", "type" + ]) except Exception as e: logging.error(f"Failed to transform the accounts DataFrame: {e}") return - # Write the DataFrame to a new parquet file + logging.info("Writing the transformed accounts DataFrame to parquet file") try: - accounts_df.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet') + drop_accounts_columns.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet') except Exception as e: logging.error(f"Failed to write the transformed accounts DataFrame to parquet file: {e}") return @@ -74,15 +82,14 @@ class DimCategories(Dimensions): self.transform() def transform(self): - # Read the parquet file into a polars DataFrame try: - categories_df = pl.read_parquet(self.file_path) + source_categories = pl.read_parquet(self.file_path) except Exception as e: logging.error(f"Failed to read the base categories parquet file: {e}") return logging.info("Transforming the categories DataFrame") try: - categories_df = categories_df.select([ + base_categories = source_categories.select([ 'id', 'name', 'category_group_name', @@ -98,29 +105,32 @@ class DimCategories(Dimensions): return try: - # Rename the columns - categories_df = categories_df.with_columns(pl.col('id').alias('category_id')) - categories_df = categories_df.with_columns(pl.col('name').alias('category_name')) - - # Fill null values in the note column - categories_df = categories_df.with_columns(pl.col('note').fill_null('unknown')) - - # Convert the balance, budgeted, and activity columns to decimal - categories_df = categories_df.with_columns(pl.col('balance') / 100) - categories_df = categories_df.with_columns(pl.col('budgeted') / 100) - categories_df = categories_df.with_columns(pl.col('activity') / 100) + add_categories_prefix = base_categories.with_columns([ + pl.col('id').alias('category_id'), + pl.col('name').alias('category_name') + ]) + fill_null_category_values = add_categories_prefix.with_columns([ + pl.col('note').fill_null('none') + ]) + fix_categories_values = fill_null_category_values.with_columns([ + (pl.col('balance') / 100), + (pl.col('budgeted') / 100), + (pl.col('activity') / 100) + ]) + drop_categories_columns = fix_categories_values.drop([ + 'id', 'name' + ]) except Exception as e: logging.error(f"Failed to transform the categories DataFrame: {e}") return - # Write the DataFrame to a new parquet file logging.info("Writing the transformed categories DataFrame to parquet file") try: - categories_df.write_parquet(self.config['warehouse_data_path'] + '/categories.parquet') + drop_categories_columns.write_parquet(self.config['warehouse_data_path'] + '/categories.parquet') except Exception as e: logging.error(f"Failed to write the transformed categories DataFrame to parquet file: {e}") return - + class DimPayees(Dimensions): def __init__(self, config): super().__init__(config) @@ -128,15 +138,14 @@ class DimPayees(Dimensions): self.transform() def transform(self): - # Read the parquet file into a polars DataFrame try: - payees_df = pl.read_parquet(self.file_path) + source_payees = pl.read_parquet(self.file_path) except Exception as e: logging.error(f"Failed to read the base payees parquet file: {e}") return logging.info("Transforming the payees DataFrame") try: - payees_df = payees_df.select([ + base_payees = source_payees.select([ 'id', 'name', 'deleted' @@ -144,10 +153,15 @@ class DimPayees(Dimensions): except Exception as e: logging.error(f"Failed to select columns from the payees DataFrame: {e}") return + try: - # Rename the columns - payees_df = payees_df.with_columns(pl.col('id').alias('payee_id')) - payees_df = payees_df.with_columns(pl.col('name').alias('payee_name')) + add_payees_prefix = base_payees.with_columns([ + pl.col('id').alias('payee_id'), + pl.col('name').alias('payee_name') + ]) + drop_payees_columns = add_payees_prefix.drop([ + 'id', 'name' + ]) except Exception as e: logging.error(f"Failed to rename columns in the payees DataFrame: {e}") return @@ -155,7 +169,7 @@ class DimPayees(Dimensions): # Write the DataFrame to a new parquet file logging.info("Writing the transformed payees DataFrame to parquet file") try: - payees_df.write_parquet(self.config['warehouse_data_path'] + '/payees.parquet') + drop_payees_columns.write_parquet(self.config['warehouse_data_path'] + '/payees.parquet') except Exception as e: logging.error(f"Failed to write the transformed payees DataFrame to parquet file: {e}") return