diff --git a/config/exit_codes.py b/config/exit_codes.py index 5c508c0..5408295 100644 --- a/config/exit_codes.py +++ b/config/exit_codes.py @@ -10,4 +10,7 @@ NOT_FOUND = 8 CONFLICT = 9 MOVE_FILE_ERROR = 10 DUPLICATE_RESOLUTION_ERROR = 11 -UNIQUE_ID_NOT_FOUND = 12 \ No newline at end of file +UNIQUE_ID_NOT_FOUND = 12 +NO_DATA_PRODUCED = 13 +MISSING_DATA_FILES = 14 +BAD_JOIN = 15 \ No newline at end of file diff --git a/dash_app.py b/dash_app.py index 9533285..71e13e7 100644 --- a/dash_app.py +++ b/dash_app.py @@ -5,22 +5,33 @@ import plotly.express as px from dash import Dash, html, dcc import dash_bootstrap_components as dbc import pandas as pd +import logging +import sys +import config.exit_codes as ec -accounts = pl.read_parquet('data/warehouse/accounts.parquet') -categories = pl.read_parquet('data/warehouse/categories.parquet') -dates = pl.read_parquet('data/warehouse/dates.parquet') -payees = pl.read_parquet('data/warehouse/payees.parquet') -scheduled_transactions = pl.read_parquet('data/warehouse/scheduled_transactions.parquet') -transactions = pl.read_parquet('data/warehouse/transactions.parquet') +try: + accounts = pl.read_parquet('data/warehouse/accounts.parquet') + categories = pl.read_parquet('data/warehouse/categories.parquet') + dates = pl.read_parquet('data/warehouse/dates.parquet') + payees = pl.read_parquet('data/warehouse/payees.parquet') + scheduled_transactions = pl.read_parquet('data/warehouse/scheduled_transactions.parquet') + transactions = pl.read_parquet('data/warehouse/transactions.parquet') +except FileNotFoundError: + logging.error('Data warehouse files not found. Run the data pipeline to create them.') + sys.exit(ec.MISSING_DATA_FILES) -# Join transactions with accounts, categories, and payees to create a master DataFrame -master_df = transactions.join(categories, left_on='category_id', right_on='id', suffix='_category')\ - .join(accounts, left_on='account_id', right_on='id', suffix='_account')\ - .join(payees, left_on='payee_id', right_on='id', suffix='_payee')\ - .join(dates, left_on='transaction_date', right_on='date_id', suffix='_date')\ +try: + # Join transactions with accounts, categories, and payees to create a master DataFrame + master_transactions = transactions.join(categories, left_on='category_id', right_on='category_id', suffix='_category')\ + .join(accounts, left_on='account_id', right_on='account_id', suffix='_account')\ + .join(payees, left_on='payee_id', right_on='payee_id', suffix='_payee')\ + .join(dates, left_on='transaction_date', right_on='date_id', suffix='_date') +except Exception as e: + logging.error(f'Error joining DataFrames: {e}') + sys.exit(ec.BAD_JOIN) # Create aggregations -spend_per_day = master_df.sql(''' +spend_per_day = master_transactions.sql(''' SELECT date, year, @@ -34,7 +45,7 @@ spend_per_day = master_df.sql(''' ''' ) -spend_per_category = master_df.sql(''' +spend_per_category = master_transactions.sql(''' SELECT category_name, ABS(SUM(transaction_amount)) as total @@ -45,7 +56,7 @@ spend_per_category = master_df.sql(''' ''' ) -spend_per_payee = master_df.sql(''' +spend_per_payee = master_transactions.sql(''' SELECT payee_name, ABS(SUM(transaction_amount)) as total diff --git a/docs/ERD.md b/docs/ERD.md index 23cc120..4410009 100644 --- a/docs/ERD.md +++ b/docs/ERD.md @@ -34,23 +34,29 @@ erDiagram } DATES { - int date_id - string date + string date_id + date date int year int month int day + boolean is_weekday + int weekday } TRANSACTIONS { - int transaction_id + str transaction_id int account_id int category_id int payee_id - int date_id + int transaction_date decimal amount boolean cleared boolean approved boolean deleted + string memo + string flag_color + str transfer_account_id + } SCHEDULED_TRANSACTIONS { @@ -58,10 +64,14 @@ erDiagram int account_id int category_id int payee_id - int date_id + str date_first + str date_next decimal amount string frequency boolean deleted + text memo + string flag_color + str transfer_account_id } TRANSACTIONS ||--o{ ACCOUNTS : "belongs to" @@ -71,6 +81,6 @@ erDiagram SCHEDULED_TRANSACTIONS ||--o{ ACCOUNTS : "belongs to" SCHEDULED_TRANSACTIONS ||--o{ CATEGORIES : "belongs to" SCHEDULED_TRANSACTIONS ||--o{ PAYEES : "belongs to" - SCHEDULED_TRANSACTIONS ||--o{ DATES : "scheduled on" + SCHEDULED_TRANSACTIONS ||--o{ DATES : "First Scheduled" + SCHEDULED_TRANSACTIONS ||--o{ DATES : "Next Scheduled" ``` - diff --git a/docs/Get_Started.md b/docs/Get_Started.md index 05c93f4..d90c2b7 100644 --- a/docs/Get_Started.md +++ b/docs/Get_Started.md @@ -25,7 +25,7 @@ For the `BUDGET_ID`, you can get it from the URL of your budget page on the YNAB ### Clone the repository ```bash -git clone #link tbc +git clone https://github.com/Jake-Pullen/data_pipeline_for_YNAB.git ``` ### Install dependencies diff --git a/docs/dataflow.md b/docs/dataflow.md index 37f0fb7..1f2bcfc 100644 --- a/docs/dataflow.md +++ b/docs/dataflow.md @@ -28,3 +28,7 @@ The Data Warehouse is the data after it has been aggregated and transformed. It ## Processed Archive The Processed Archive is the data after it has been processed and stored in the base tables. It is the raw json files in the `data/processed/` directory with a folder for each entity and file for each load that has been processed. + +## Visualisation datasets + +When preparing the data for visualisation, we create dataframes in memory that are used to create the visualisations. These are not stored on disk. diff --git a/main.py b/main.py index 834983c..41f079a 100644 --- a/main.py +++ b/main.py @@ -8,7 +8,6 @@ import logging.config import logging.handlers import config.exit_codes as ec -from dash_app import app from pipeline.pipeline_main import pipeline_main def set_up_logging(): @@ -58,7 +57,15 @@ config['BUDGET_ID'] = BUDGET_ID if __name__ == '__main__': try: pipeline_main(config) - app.run() #debug=True) + + # Check if the data was successfully created + data_exists = os.path.exists('data/processed') and os.listdir('data/processed') + if data_exists: + from dash_app import app + app.run() # debug=True + else: + logging.error('Data pipeline did not produce any data. Dash app will not run.') + sys.exit(ec.NO_DATA_PRODUCED) except SystemExit as e: exit_code = e.code if exit_code == ec.SUCCESS: diff --git a/pipeline/dimensions.py b/pipeline/dimensions.py index 2bc51e7..146e303 100644 --- a/pipeline/dimensions.py +++ b/pipeline/dimensions.py @@ -22,47 +22,55 @@ class DimAccounts(Dimensions): def transform(self): # Read the parquet file into a polars DataFrame try: - accounts_df = pl.read_parquet(self.file_path) + source_accounts = pl.read_parquet(self.file_path) except Exception as e: logging.error(f"Failed to read the base accounts parquet file: {e}") return - # Transform the DataFrame logging.info("Transforming the accounts DataFrame") try: - accounts_df = ( - accounts_df - .with_columns([ - pl.col("id").alias("account_id"), - pl.col("name").alias("account_name"), - pl.col("type").alias("account_type"), - pl.col("on_budget").alias("on_budget"), - pl.col("closed").alias("closed"), - pl.col("note").alias("note"), - pl.col("balance").alias("balance"), - pl.col("cleared_balance").alias("cleared_balance"), - pl.col("uncleared_balance").alias("uncleared_balance"), - pl.col("deleted").alias("deleted"), - ]) - .with_columns([ - pl.col("note").fill_null("unknown"), - (pl.col("balance") / 100).alias("balance"), - (pl.col("cleared_balance") / 100).alias("cleared_balance"), - (pl.col("uncleared_balance") / 100).alias("uncleared_balance"), - ]) - .drop([ - "transfer_payee_id", "direct_import_linked", "direct_import_in_error", - "last_reconciled_at", "debt_original_balance", "debt_interest_rates", - "debt_minimum_payments", "debt_escrow_amounts", "ingestion_date" + base_accounts = ( + source_accounts.select([ + "id", + "name", + "type", + "on_budget", + "closed", + "note", + "balance", + "cleared_balance", + "uncleared_balance", + "deleted" ]) ) + except Exception as e: + logging.error(f"Failed to select columns from the categories DataFrame: {e}") + return + + try: + add_accounts_prefix = base_accounts.with_columns([ + pl.col("id").alias("account_id"), + pl.col("name").alias("account_name"), + pl.col("type").alias("account_type") + ]) + fill_accounts_null_values = add_accounts_prefix.with_columns([ + pl.col('note').fill_null('none') + ]) + fix_accounts_values = fill_accounts_null_values.with_columns([ + (pl.col("balance") / 1000).alias("balance"), + (pl.col("cleared_balance") / 1000).alias("cleared_balance"), + (pl.col("uncleared_balance") / 1000).alias("uncleared_balance"), + ]) + drop_accounts_columns = fix_accounts_values.drop([ + "id", "name", "type" + ]) except Exception as e: logging.error(f"Failed to transform the accounts DataFrame: {e}") return - # Write the DataFrame to a new parquet file + logging.info("Writing the transformed accounts DataFrame to parquet file") try: - accounts_df.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet') + drop_accounts_columns.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet') except Exception as e: logging.error(f"Failed to write the transformed accounts DataFrame to parquet file: {e}") return @@ -74,15 +82,14 @@ class DimCategories(Dimensions): self.transform() def transform(self): - # Read the parquet file into a polars DataFrame try: - categories_df = pl.read_parquet(self.file_path) + source_categories = pl.read_parquet(self.file_path) except Exception as e: logging.error(f"Failed to read the base categories parquet file: {e}") return logging.info("Transforming the categories DataFrame") try: - categories_df = categories_df.select([ + base_categories = source_categories.select([ 'id', 'name', 'category_group_name', @@ -98,29 +105,32 @@ class DimCategories(Dimensions): return try: - # Rename the columns - categories_df = categories_df.with_columns(pl.col('id').alias('category_id')) - categories_df = categories_df.with_columns(pl.col('name').alias('category_name')) - - # Fill null values in the note column - categories_df = categories_df.with_columns(pl.col('note').fill_null('unknown')) - - # Convert the balance, budgeted, and activity columns to decimal - categories_df = categories_df.with_columns(pl.col('balance') / 100) - categories_df = categories_df.with_columns(pl.col('budgeted') / 100) - categories_df = categories_df.with_columns(pl.col('activity') / 100) + add_categories_prefix = base_categories.with_columns([ + pl.col('id').alias('category_id'), + pl.col('name').alias('category_name') + ]) + fill_null_category_values = add_categories_prefix.with_columns([ + pl.col('note').fill_null('none') + ]) + fix_categories_values = fill_null_category_values.with_columns([ + (pl.col('balance') / 1000), + (pl.col('budgeted') / 1000), + (pl.col('activity') / 1000) + ]) + drop_categories_columns = fix_categories_values.drop([ + 'id', 'name' + ]) except Exception as e: logging.error(f"Failed to transform the categories DataFrame: {e}") return - # Write the DataFrame to a new parquet file logging.info("Writing the transformed categories DataFrame to parquet file") try: - categories_df.write_parquet(self.config['warehouse_data_path'] + '/categories.parquet') + drop_categories_columns.write_parquet(self.config['warehouse_data_path'] + '/categories.parquet') except Exception as e: logging.error(f"Failed to write the transformed categories DataFrame to parquet file: {e}") return - + class DimPayees(Dimensions): def __init__(self, config): super().__init__(config) @@ -128,15 +138,14 @@ class DimPayees(Dimensions): self.transform() def transform(self): - # Read the parquet file into a polars DataFrame try: - payees_df = pl.read_parquet(self.file_path) + source_payees = pl.read_parquet(self.file_path) except Exception as e: logging.error(f"Failed to read the base payees parquet file: {e}") return logging.info("Transforming the payees DataFrame") try: - payees_df = payees_df.select([ + base_payees = source_payees.select([ 'id', 'name', 'deleted' @@ -144,10 +153,15 @@ class DimPayees(Dimensions): except Exception as e: logging.error(f"Failed to select columns from the payees DataFrame: {e}") return + try: - # Rename the columns - payees_df = payees_df.with_columns(pl.col('id').alias('payee_id')) - payees_df = payees_df.with_columns(pl.col('name').alias('payee_name')) + add_payees_prefix = base_payees.with_columns([ + pl.col('id').alias('payee_id'), + pl.col('name').alias('payee_name') + ]) + drop_payees_columns = add_payees_prefix.drop([ + 'id', 'name' + ]) except Exception as e: logging.error(f"Failed to rename columns in the payees DataFrame: {e}") return @@ -155,7 +169,7 @@ class DimPayees(Dimensions): # Write the DataFrame to a new parquet file logging.info("Writing the transformed payees DataFrame to parquet file") try: - payees_df.write_parquet(self.config['warehouse_data_path'] + '/payees.parquet') + drop_payees_columns.write_parquet(self.config['warehouse_data_path'] + '/payees.parquet') except Exception as e: logging.error(f"Failed to write the transformed payees DataFrame to parquet file: {e}") return @@ -186,7 +200,7 @@ class DimDate(Dimensions): try: # Create a new column to indicate if the date is a weekday or weekend dates_df = dates_df.with_columns([ - (pl.col('weekday') < 5).alias('is_weekday') # True for weekdays (Monday to Friday), False for weekends (Saturday and Sunday) + (pl.col('weekday') < 6).alias('is_weekday') # True for weekdays (Monday to Friday), False for weekends (Saturday and Sunday) ]) except Exception as e: logging.error(f"Failed to create a new column to indicate if the date is a weekday or weekend: {e}") diff --git a/pipeline/facts.py b/pipeline/facts.py index 272ef11..1719824 100644 --- a/pipeline/facts.py +++ b/pipeline/facts.py @@ -18,60 +18,68 @@ class FactTransactions(Facts): self.transform() def transform(self): - # Read the parquet file into a polars DataFrame try: - transactions_df = pl.read_parquet(self.file_path) + source_transactions = pl.read_parquet(self.file_path) except FileNotFoundError: logging.error("The transactions DataFrame does not exist") return - - # Transform the DataFrame + + try: + base_transactions = source_transactions.select([ + "id", + "date", + "amount", + "memo", + "cleared", + "approved", + "flag_color", + "account_id", + "payee_id", + "category_id", + "transfer_account_id" + ]) + except Exception as e: + logging.error(f"Failed to select columns from the transactions DataFrame: {e}") + return + logging.info("Transforming the transactions DataFrame") try: - # Ensure the date column is in datetime format - transactions_df = transactions_df.with_columns([ + resolve_transaction_dates = base_transactions.with_columns([ pl.col("date").str.strptime(pl.Date, format="%Y-%m-%d").alias("date") ]) except Exception as e: logging.error(f"Failed to covert the date to date format: {e}") return - + try: - transactions_df = ( - transactions_df - .with_columns([ - pl.col("id").alias("transaction_id"), - (pl.col("date").dt.year().cast(pl.Utf8) + - pl.col("date").dt.month().cast(pl.Utf8).str.zfill(2) + - pl.col("date").dt.day().cast(pl.Utf8).str.zfill(2)).alias("transaction_date"), - pl.col("amount").alias("transaction_amount"), - pl.col("memo").alias("transaction_memo"), - pl.col("cleared").alias("transaction_cleared"), - pl.col("approved").alias("transaction_approved"), - pl.col("flag_color").alias("transaction_flag_color"), - pl.col("account_id").alias("account_id"), - pl.col("payee_id").alias("payee_id"), - pl.col("category_id").alias("category_id"), - pl.col("transfer_account_id").alias("transfer_account_id"), - ]) - .with_columns([ - pl.col("memo").fill_null("unknown"), - (pl.col("amount") / 1000).alias("transaction_amount"), - ]) - .drop([ - "transfer_transaction_id", "matched_transaction_id", "import_id", - "subtransactions", "deleted","flag_name","account_name", - "payee_name","category_name","import_payee_name","import_payee_name_original", - "debt_transaction_type","ingestion_date" - ]) - ) + add_transaction_prefix = resolve_transaction_dates.with_columns([ + pl.col("id").alias("transaction_id"), + (pl.col("date").dt.year().cast(pl.Utf8) + + pl.col("date").dt.month().cast(pl.Utf8).str.zfill(2) + + pl.col("date").dt.day().cast(pl.Utf8).str.zfill(2)).alias("transaction_date"), + ]) + fix_transaction_nulls = add_transaction_prefix.with_columns([ + pl.col("memo").fill_null("none"), + pl.col("flag_color").fill_null("none"), + pl.col("transfer_account_id").fill_null("none"), + pl.col("category_id").fill_null("none"), + ]) + fix_transaction_values = fix_transaction_nulls.with_columns([ + (pl.col("amount") / 1000).alias("transaction_amount") + ]) + drop_transaction_columns = fix_transaction_values.drop([ + "id", "date", "amount" + ]) + except Exception as e: logging.error(f"Failed to transform the transactions DataFrame: {e}") return # Write the DataFrame to a new parquet file logging.info("Writing the transformed transactions DataFrame to parquet file") try: - transactions_df.write_parquet(self.config['warehouse_data_path'] + '/transactions.parquet') + drop_transaction_columns.write_parquet( + self.config['warehouse_data_path'] + '/transactions.parquet' + ) except Exception as e: logging.error(f"Failed to write the transformed transactions DataFrame: {e}") @@ -82,46 +90,61 @@ class FactScheduledTransactions(Facts): self.transform() def transform(self): - # Read the parquet file into a polars DataFrame try: - scheduled_transactions_df = pl.read_parquet(self.file_path) + source_scheduled = pl.read_parquet(self.file_path) except FileNotFoundError: logging.error("The scheduled transactions DataFrame does not exist") return - # Transform the DataFrame + try: + base_scheduled = source_scheduled.select([ + "id", + "date_first", + "date_next", + "frequency", + "amount", + "memo", + "flag_color", + "account_id", + "payee_id", + "category_id", + "transfer_account_id" + ]) + except Exception as e: + logging.error(f"Failed to select columns from the scheduled transactions DataFrame: {e}") + return + + try: + resolve_scheduled_dates = base_scheduled.with_columns([ + pl.col("date_first").str.strptime(pl.Date, format="%Y-%m-%d").alias("date_first"), + pl.col("date_next").str.strptime(pl.Date, format="%Y-%m-%d").alias("date_next") + ]) + except Exception as e: + logging.error(f"Failed to covert the date to date format: {e}") + return + logging.info("Transforming the scheduled transactions DataFrame") try: - scheduled_transactions_df = ( - scheduled_transactions_df - .with_columns([ - pl.col("id").alias("scheduled_transaction_id"), - pl.col("date_first").alias("scheduled_transaction_first_date"), - pl.col("date_next").alias("scheduled_transaction_next_date"), - pl.col("frequency").alias("scheduled_transaction_frequency"), - pl.col("amount").alias("scheduled_transaction_amount"), - pl.col("memo").alias("scheduled_transaction_memo"), - pl.col("flag_color").alias("scheduled_transaction_flag_color"), - pl.col("account_id").alias("account_id"), - pl.col("payee_id").alias("payee_id"), - pl.col("category_id").alias("category_id"), - pl.col("transfer_account_id").alias("transfer_account_id"), - ]) - .with_columns([ - pl.col("memo").fill_null("unknown"), - (pl.col("amount") / 1000).alias("scheduled_transaction_amount"), - ]) - .drop([ - "subtransactions", "deleted","flag_name","account_name", - "payee_name","category_name","ingestion_date" - ]) - ) + add_scheduled_prefix = resolve_scheduled_dates.with_columns([ + pl.col("id").alias("scheduled_transaction_id") + ]) + fix_sheduled_nulls = add_scheduled_prefix.with_columns([ + pl.col("memo").fill_null("none"), + pl.col("flag_color").fill_null("none"), + pl.col("transfer_account_id").fill_null("none"), + pl.col("category_id").fill_null("none"), + ]) + fix_scheduled_values = fix_sheduled_nulls.with_columns([ + (pl.col("amount") / 1000).alias("scheduled_transaction_amount"), + ]) + drop_scheduled_columns = fix_scheduled_values.drop([ + "id", "amount" + ]) except Exception as e: logging.error(f"Failed to transform the scheduled transactions DataFrame: {e}") return - # Write the DataFrame to a new parquet file logging.info("Writing the transformed scheduled transactions DataFrame to parquet file") try: - scheduled_transactions_df.write_parquet(self.config['warehouse_data_path'] + '/scheduled_transactions.parquet') + drop_scheduled_columns.write_parquet(self.config['warehouse_data_path'] + '/scheduled_transactions.parquet') except Exception as e: logging.error(f"Failed to write the transformed scheduled transactions DataFrame: {e}")