Merge pull request #16 from Jake-Pullen/document_and_tidy

Document and tidy
This commit is contained in:
Jake-Pullen
2024-09-04 11:51:53 +01:00
committed by GitHub
8 changed files with 215 additions and 143 deletions
+3
View File
@@ -11,3 +11,6 @@ CONFLICT = 9
MOVE_FILE_ERROR = 10 MOVE_FILE_ERROR = 10
DUPLICATE_RESOLUTION_ERROR = 11 DUPLICATE_RESOLUTION_ERROR = 11
UNIQUE_ID_NOT_FOUND = 12 UNIQUE_ID_NOT_FOUND = 12
NO_DATA_PRODUCED = 13
MISSING_DATA_FILES = 14
BAD_JOIN = 15
+18 -7
View File
@@ -5,22 +5,33 @@ import plotly.express as px
from dash import Dash, html, dcc from dash import Dash, html, dcc
import dash_bootstrap_components as dbc import dash_bootstrap_components as dbc
import pandas as pd import pandas as pd
import logging
import sys
import config.exit_codes as ec
try:
accounts = pl.read_parquet('data/warehouse/accounts.parquet') accounts = pl.read_parquet('data/warehouse/accounts.parquet')
categories = pl.read_parquet('data/warehouse/categories.parquet') categories = pl.read_parquet('data/warehouse/categories.parquet')
dates = pl.read_parquet('data/warehouse/dates.parquet') dates = pl.read_parquet('data/warehouse/dates.parquet')
payees = pl.read_parquet('data/warehouse/payees.parquet') payees = pl.read_parquet('data/warehouse/payees.parquet')
scheduled_transactions = pl.read_parquet('data/warehouse/scheduled_transactions.parquet') scheduled_transactions = pl.read_parquet('data/warehouse/scheduled_transactions.parquet')
transactions = pl.read_parquet('data/warehouse/transactions.parquet') transactions = pl.read_parquet('data/warehouse/transactions.parquet')
except FileNotFoundError:
logging.error('Data warehouse files not found. Run the data pipeline to create them.')
sys.exit(ec.MISSING_DATA_FILES)
try:
# Join transactions with accounts, categories, and payees to create a master DataFrame # Join transactions with accounts, categories, and payees to create a master DataFrame
master_df = transactions.join(categories, left_on='category_id', right_on='id', suffix='_category')\ master_transactions = transactions.join(categories, left_on='category_id', right_on='category_id', suffix='_category')\
.join(accounts, left_on='account_id', right_on='id', suffix='_account')\ .join(accounts, left_on='account_id', right_on='account_id', suffix='_account')\
.join(payees, left_on='payee_id', right_on='id', suffix='_payee')\ .join(payees, left_on='payee_id', right_on='payee_id', suffix='_payee')\
.join(dates, left_on='transaction_date', right_on='date_id', suffix='_date')\ .join(dates, left_on='transaction_date', right_on='date_id', suffix='_date')
except Exception as e:
logging.error(f'Error joining DataFrames: {e}')
sys.exit(ec.BAD_JOIN)
# Create aggregations # Create aggregations
spend_per_day = master_df.sql(''' spend_per_day = master_transactions.sql('''
SELECT SELECT
date, date,
year, year,
@@ -34,7 +45,7 @@ spend_per_day = master_df.sql('''
''' '''
) )
spend_per_category = master_df.sql(''' spend_per_category = master_transactions.sql('''
SELECT SELECT
category_name, category_name,
ABS(SUM(transaction_amount)) as total ABS(SUM(transaction_amount)) as total
@@ -45,7 +56,7 @@ spend_per_category = master_df.sql('''
''' '''
) )
spend_per_payee = master_df.sql(''' spend_per_payee = master_transactions.sql('''
SELECT SELECT
payee_name, payee_name,
ABS(SUM(transaction_amount)) as total ABS(SUM(transaction_amount)) as total
+17 -7
View File
@@ -34,23 +34,29 @@ erDiagram
} }
DATES { DATES {
int date_id string date_id
string date date date
int year int year
int month int month
int day int day
boolean is_weekday
int weekday
} }
TRANSACTIONS { TRANSACTIONS {
int transaction_id str transaction_id
int account_id int account_id
int category_id int category_id
int payee_id int payee_id
int date_id int transaction_date
decimal amount decimal amount
boolean cleared boolean cleared
boolean approved boolean approved
boolean deleted boolean deleted
string memo
string flag_color
str transfer_account_id
} }
SCHEDULED_TRANSACTIONS { SCHEDULED_TRANSACTIONS {
@@ -58,10 +64,14 @@ erDiagram
int account_id int account_id
int category_id int category_id
int payee_id int payee_id
int date_id str date_first
str date_next
decimal amount decimal amount
string frequency string frequency
boolean deleted boolean deleted
text memo
string flag_color
str transfer_account_id
} }
TRANSACTIONS ||--o{ ACCOUNTS : "belongs to" TRANSACTIONS ||--o{ ACCOUNTS : "belongs to"
@@ -71,6 +81,6 @@ erDiagram
SCHEDULED_TRANSACTIONS ||--o{ ACCOUNTS : "belongs to" SCHEDULED_TRANSACTIONS ||--o{ ACCOUNTS : "belongs to"
SCHEDULED_TRANSACTIONS ||--o{ CATEGORIES : "belongs to" SCHEDULED_TRANSACTIONS ||--o{ CATEGORIES : "belongs to"
SCHEDULED_TRANSACTIONS ||--o{ PAYEES : "belongs to" SCHEDULED_TRANSACTIONS ||--o{ PAYEES : "belongs to"
SCHEDULED_TRANSACTIONS ||--o{ DATES : "scheduled on" SCHEDULED_TRANSACTIONS ||--o{ DATES : "First Scheduled"
SCHEDULED_TRANSACTIONS ||--o{ DATES : "Next Scheduled"
``` ```
+1 -1
View File
@@ -25,7 +25,7 @@ For the `BUDGET_ID`, you can get it from the URL of your budget page on the YNAB
### Clone the repository ### Clone the repository
```bash ```bash
git clone #link tbc git clone https://github.com/Jake-Pullen/data_pipeline_for_YNAB.git
``` ```
### Install dependencies ### Install dependencies
+4
View File
@@ -28,3 +28,7 @@ The Data Warehouse is the data after it has been aggregated and transformed. It
## Processed Archive ## Processed Archive
The Processed Archive is the data after it has been processed and stored in the base tables. It is the raw json files in the `data/processed/` directory with a folder for each entity and file for each load that has been processed. The Processed Archive is the data after it has been processed and stored in the base tables. It is the raw json files in the `data/processed/` directory with a folder for each entity and file for each load that has been processed.
## Visualisation datasets
When preparing the data for visualisation, we create dataframes in memory that are used to create the visualisations. These are not stored on disk.
+9 -2
View File
@@ -8,7 +8,6 @@ import logging.config
import logging.handlers import logging.handlers
import config.exit_codes as ec import config.exit_codes as ec
from dash_app import app
from pipeline.pipeline_main import pipeline_main from pipeline.pipeline_main import pipeline_main
def set_up_logging(): def set_up_logging():
@@ -58,7 +57,15 @@ config['BUDGET_ID'] = BUDGET_ID
if __name__ == '__main__': if __name__ == '__main__':
try: try:
pipeline_main(config) pipeline_main(config)
app.run() #debug=True)
# Check if the data was successfully created
data_exists = os.path.exists('data/processed') and os.listdir('data/processed')
if data_exists:
from dash_app import app
app.run() # debug=True
else:
logging.error('Data pipeline did not produce any data. Dash app will not run.')
sys.exit(ec.NO_DATA_PRODUCED)
except SystemExit as e: except SystemExit as e:
exit_code = e.code exit_code = e.code
if exit_code == ec.SUCCESS: if exit_code == ec.SUCCESS:
+66 -52
View File
@@ -22,47 +22,55 @@ class DimAccounts(Dimensions):
def transform(self): def transform(self):
# Read the parquet file into a polars DataFrame # Read the parquet file into a polars DataFrame
try: try:
accounts_df = pl.read_parquet(self.file_path) source_accounts = pl.read_parquet(self.file_path)
except Exception as e: except Exception as e:
logging.error(f"Failed to read the base accounts parquet file: {e}") logging.error(f"Failed to read the base accounts parquet file: {e}")
return return
# Transform the DataFrame
logging.info("Transforming the accounts DataFrame") logging.info("Transforming the accounts DataFrame")
try: try:
accounts_df = ( base_accounts = (
accounts_df source_accounts.select([
.with_columns([ "id",
pl.col("id").alias("account_id"), "name",
pl.col("name").alias("account_name"), "type",
pl.col("type").alias("account_type"), "on_budget",
pl.col("on_budget").alias("on_budget"), "closed",
pl.col("closed").alias("closed"), "note",
pl.col("note").alias("note"), "balance",
pl.col("balance").alias("balance"), "cleared_balance",
pl.col("cleared_balance").alias("cleared_balance"), "uncleared_balance",
pl.col("uncleared_balance").alias("uncleared_balance"), "deleted"
pl.col("deleted").alias("deleted"),
])
.with_columns([
pl.col("note").fill_null("unknown"),
(pl.col("balance") / 100).alias("balance"),
(pl.col("cleared_balance") / 100).alias("cleared_balance"),
(pl.col("uncleared_balance") / 100).alias("uncleared_balance"),
])
.drop([
"transfer_payee_id", "direct_import_linked", "direct_import_in_error",
"last_reconciled_at", "debt_original_balance", "debt_interest_rates",
"debt_minimum_payments", "debt_escrow_amounts", "ingestion_date"
]) ])
) )
except Exception as e:
logging.error(f"Failed to select columns from the categories DataFrame: {e}")
return
try:
add_accounts_prefix = base_accounts.with_columns([
pl.col("id").alias("account_id"),
pl.col("name").alias("account_name"),
pl.col("type").alias("account_type")
])
fill_accounts_null_values = add_accounts_prefix.with_columns([
pl.col('note').fill_null('none')
])
fix_accounts_values = fill_accounts_null_values.with_columns([
(pl.col("balance") / 1000).alias("balance"),
(pl.col("cleared_balance") / 1000).alias("cleared_balance"),
(pl.col("uncleared_balance") / 1000).alias("uncleared_balance"),
])
drop_accounts_columns = fix_accounts_values.drop([
"id", "name", "type"
])
except Exception as e: except Exception as e:
logging.error(f"Failed to transform the accounts DataFrame: {e}") logging.error(f"Failed to transform the accounts DataFrame: {e}")
return return
# Write the DataFrame to a new parquet file
logging.info("Writing the transformed accounts DataFrame to parquet file") logging.info("Writing the transformed accounts DataFrame to parquet file")
try: try:
accounts_df.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet') drop_accounts_columns.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet')
except Exception as e: except Exception as e:
logging.error(f"Failed to write the transformed accounts DataFrame to parquet file: {e}") logging.error(f"Failed to write the transformed accounts DataFrame to parquet file: {e}")
return return
@@ -74,15 +82,14 @@ class DimCategories(Dimensions):
self.transform() self.transform()
def transform(self): def transform(self):
# Read the parquet file into a polars DataFrame
try: try:
categories_df = pl.read_parquet(self.file_path) source_categories = pl.read_parquet(self.file_path)
except Exception as e: except Exception as e:
logging.error(f"Failed to read the base categories parquet file: {e}") logging.error(f"Failed to read the base categories parquet file: {e}")
return return
logging.info("Transforming the categories DataFrame") logging.info("Transforming the categories DataFrame")
try: try:
categories_df = categories_df.select([ base_categories = source_categories.select([
'id', 'id',
'name', 'name',
'category_group_name', 'category_group_name',
@@ -98,25 +105,28 @@ class DimCategories(Dimensions):
return return
try: try:
# Rename the columns add_categories_prefix = base_categories.with_columns([
categories_df = categories_df.with_columns(pl.col('id').alias('category_id')) pl.col('id').alias('category_id'),
categories_df = categories_df.with_columns(pl.col('name').alias('category_name')) pl.col('name').alias('category_name')
])
# Fill null values in the note column fill_null_category_values = add_categories_prefix.with_columns([
categories_df = categories_df.with_columns(pl.col('note').fill_null('unknown')) pl.col('note').fill_null('none')
])
# Convert the balance, budgeted, and activity columns to decimal fix_categories_values = fill_null_category_values.with_columns([
categories_df = categories_df.with_columns(pl.col('balance') / 100) (pl.col('balance') / 1000),
categories_df = categories_df.with_columns(pl.col('budgeted') / 100) (pl.col('budgeted') / 1000),
categories_df = categories_df.with_columns(pl.col('activity') / 100) (pl.col('activity') / 1000)
])
drop_categories_columns = fix_categories_values.drop([
'id', 'name'
])
except Exception as e: except Exception as e:
logging.error(f"Failed to transform the categories DataFrame: {e}") logging.error(f"Failed to transform the categories DataFrame: {e}")
return return
# Write the DataFrame to a new parquet file
logging.info("Writing the transformed categories DataFrame to parquet file") logging.info("Writing the transformed categories DataFrame to parquet file")
try: try:
categories_df.write_parquet(self.config['warehouse_data_path'] + '/categories.parquet') drop_categories_columns.write_parquet(self.config['warehouse_data_path'] + '/categories.parquet')
except Exception as e: except Exception as e:
logging.error(f"Failed to write the transformed categories DataFrame to parquet file: {e}") logging.error(f"Failed to write the transformed categories DataFrame to parquet file: {e}")
return return
@@ -128,15 +138,14 @@ class DimPayees(Dimensions):
self.transform() self.transform()
def transform(self): def transform(self):
# Read the parquet file into a polars DataFrame
try: try:
payees_df = pl.read_parquet(self.file_path) source_payees = pl.read_parquet(self.file_path)
except Exception as e: except Exception as e:
logging.error(f"Failed to read the base payees parquet file: {e}") logging.error(f"Failed to read the base payees parquet file: {e}")
return return
logging.info("Transforming the payees DataFrame") logging.info("Transforming the payees DataFrame")
try: try:
payees_df = payees_df.select([ base_payees = source_payees.select([
'id', 'id',
'name', 'name',
'deleted' 'deleted'
@@ -144,10 +153,15 @@ class DimPayees(Dimensions):
except Exception as e: except Exception as e:
logging.error(f"Failed to select columns from the payees DataFrame: {e}") logging.error(f"Failed to select columns from the payees DataFrame: {e}")
return return
try: try:
# Rename the columns add_payees_prefix = base_payees.with_columns([
payees_df = payees_df.with_columns(pl.col('id').alias('payee_id')) pl.col('id').alias('payee_id'),
payees_df = payees_df.with_columns(pl.col('name').alias('payee_name')) pl.col('name').alias('payee_name')
])
drop_payees_columns = add_payees_prefix.drop([
'id', 'name'
])
except Exception as e: except Exception as e:
logging.error(f"Failed to rename columns in the payees DataFrame: {e}") logging.error(f"Failed to rename columns in the payees DataFrame: {e}")
return return
@@ -155,7 +169,7 @@ class DimPayees(Dimensions):
# Write the DataFrame to a new parquet file # Write the DataFrame to a new parquet file
logging.info("Writing the transformed payees DataFrame to parquet file") logging.info("Writing the transformed payees DataFrame to parquet file")
try: try:
payees_df.write_parquet(self.config['warehouse_data_path'] + '/payees.parquet') drop_payees_columns.write_parquet(self.config['warehouse_data_path'] + '/payees.parquet')
except Exception as e: except Exception as e:
logging.error(f"Failed to write the transformed payees DataFrame to parquet file: {e}") logging.error(f"Failed to write the transformed payees DataFrame to parquet file: {e}")
return return
@@ -186,7 +200,7 @@ class DimDate(Dimensions):
try: try:
# Create a new column to indicate if the date is a weekday or weekend # Create a new column to indicate if the date is a weekday or weekend
dates_df = dates_df.with_columns([ dates_df = dates_df.with_columns([
(pl.col('weekday') < 5).alias('is_weekday') # True for weekdays (Monday to Friday), False for weekends (Saturday and Sunday) (pl.col('weekday') < 6).alias('is_weekday') # True for weekdays (Monday to Friday), False for weekends (Saturday and Sunday)
]) ])
except Exception as e: except Exception as e:
logging.error(f"Failed to create a new column to indicate if the date is a weekday or weekend: {e}") logging.error(f"Failed to create a new column to indicate if the date is a weekday or weekend: {e}")
+75 -52
View File
@@ -18,18 +18,33 @@ class FactTransactions(Facts):
self.transform() self.transform()
def transform(self): def transform(self):
# Read the parquet file into a polars DataFrame
try: try:
transactions_df = pl.read_parquet(self.file_path) source_transactions = pl.read_parquet(self.file_path)
except FileNotFoundError: except FileNotFoundError:
logging.error("The transactions DataFrame does not exist") logging.error("The transactions DataFrame does not exist")
return return
# Transform the DataFrame try:
base_transactions = source_transactions.select([
"id",
"date",
"amount",
"memo",
"cleared",
"approved",
"flag_color",
"account_id",
"payee_id",
"category_id",
"transfer_account_id"
])
except Exception as e:
logging.error(f"Failed to select columns from the transactions DataFrame: {e}")
return
logging.info("Transforming the transactions DataFrame") logging.info("Transforming the transactions DataFrame")
try: try:
# Ensure the date column is in datetime format resolve_transaction_dates = base_transactions.with_columns([
transactions_df = transactions_df.with_columns([
pl.col("date").str.strptime(pl.Date, format="%Y-%m-%d").alias("date") pl.col("date").str.strptime(pl.Date, format="%Y-%m-%d").alias("date")
]) ])
except Exception as e: except Exception as e:
@@ -37,41 +52,34 @@ class FactTransactions(Facts):
return return
try: try:
transactions_df = ( add_transaction_prefix = resolve_transaction_dates.with_columns([
transactions_df
.with_columns([
pl.col("id").alias("transaction_id"), pl.col("id").alias("transaction_id"),
(pl.col("date").dt.year().cast(pl.Utf8) + (pl.col("date").dt.year().cast(pl.Utf8) +
pl.col("date").dt.month().cast(pl.Utf8).str.zfill(2) + pl.col("date").dt.month().cast(pl.Utf8).str.zfill(2) +
pl.col("date").dt.day().cast(pl.Utf8).str.zfill(2)).alias("transaction_date"), pl.col("date").dt.day().cast(pl.Utf8).str.zfill(2)).alias("transaction_date"),
pl.col("amount").alias("transaction_amount"),
pl.col("memo").alias("transaction_memo"),
pl.col("cleared").alias("transaction_cleared"),
pl.col("approved").alias("transaction_approved"),
pl.col("flag_color").alias("transaction_flag_color"),
pl.col("account_id").alias("account_id"),
pl.col("payee_id").alias("payee_id"),
pl.col("category_id").alias("category_id"),
pl.col("transfer_account_id").alias("transfer_account_id"),
]) ])
.with_columns([ fix_transaction_nulls = add_transaction_prefix.with_columns([
pl.col("memo").fill_null("unknown"), pl.col("memo").fill_null("none"),
(pl.col("amount") / 1000).alias("transaction_amount"), pl.col("flag_color").fill_null("none"),
pl.col("transfer_account_id").fill_null("none"),
pl.col("category_id").fill_null("none"),
]) ])
.drop([ fix_transaction_values = fix_transaction_nulls.with_columns([
"transfer_transaction_id", "matched_transaction_id", "import_id", (pl.col("amount") / 1000).alias("transaction_amount")
"subtransactions", "deleted","flag_name","account_name",
"payee_name","category_name","import_payee_name","import_payee_name_original",
"debt_transaction_type","ingestion_date"
]) ])
) drop_transaction_columns = fix_transaction_values.drop([
"id", "date", "amount"
])
except Exception as e: except Exception as e:
logging.error(f"Failed to transform the transactions DataFrame: {e}") logging.error(f"Failed to transform the transactions DataFrame: {e}")
return return
# Write the DataFrame to a new parquet file # Write the DataFrame to a new parquet file
logging.info("Writing the transformed transactions DataFrame to parquet file") logging.info("Writing the transformed transactions DataFrame to parquet file")
try: try:
transactions_df.write_parquet(self.config['warehouse_data_path'] + '/transactions.parquet') drop_transaction_columns.write_parquet(
self.config['warehouse_data_path'] + '/transactions.parquet'
)
except Exception as e: except Exception as e:
logging.error(f"Failed to write the transformed transactions DataFrame: {e}") logging.error(f"Failed to write the transformed transactions DataFrame: {e}")
@@ -82,46 +90,61 @@ class FactScheduledTransactions(Facts):
self.transform() self.transform()
def transform(self): def transform(self):
# Read the parquet file into a polars DataFrame
try: try:
scheduled_transactions_df = pl.read_parquet(self.file_path) source_scheduled = pl.read_parquet(self.file_path)
except FileNotFoundError: except FileNotFoundError:
logging.error("The scheduled transactions DataFrame does not exist") logging.error("The scheduled transactions DataFrame does not exist")
return return
# Transform the DataFrame try:
base_scheduled = source_scheduled.select([
"id",
"date_first",
"date_next",
"frequency",
"amount",
"memo",
"flag_color",
"account_id",
"payee_id",
"category_id",
"transfer_account_id"
])
except Exception as e:
logging.error(f"Failed to select columns from the scheduled transactions DataFrame: {e}")
return
try:
resolve_scheduled_dates = base_scheduled.with_columns([
pl.col("date_first").str.strptime(pl.Date, format="%Y-%m-%d").alias("date_first"),
pl.col("date_next").str.strptime(pl.Date, format="%Y-%m-%d").alias("date_next")
])
except Exception as e:
logging.error(f"Failed to covert the date to date format: {e}")
return
logging.info("Transforming the scheduled transactions DataFrame") logging.info("Transforming the scheduled transactions DataFrame")
try: try:
scheduled_transactions_df = ( add_scheduled_prefix = resolve_scheduled_dates.with_columns([
scheduled_transactions_df pl.col("id").alias("scheduled_transaction_id")
.with_columns([
pl.col("id").alias("scheduled_transaction_id"),
pl.col("date_first").alias("scheduled_transaction_first_date"),
pl.col("date_next").alias("scheduled_transaction_next_date"),
pl.col("frequency").alias("scheduled_transaction_frequency"),
pl.col("amount").alias("scheduled_transaction_amount"),
pl.col("memo").alias("scheduled_transaction_memo"),
pl.col("flag_color").alias("scheduled_transaction_flag_color"),
pl.col("account_id").alias("account_id"),
pl.col("payee_id").alias("payee_id"),
pl.col("category_id").alias("category_id"),
pl.col("transfer_account_id").alias("transfer_account_id"),
]) ])
.with_columns([ fix_sheduled_nulls = add_scheduled_prefix.with_columns([
pl.col("memo").fill_null("unknown"), pl.col("memo").fill_null("none"),
pl.col("flag_color").fill_null("none"),
pl.col("transfer_account_id").fill_null("none"),
pl.col("category_id").fill_null("none"),
])
fix_scheduled_values = fix_sheduled_nulls.with_columns([
(pl.col("amount") / 1000).alias("scheduled_transaction_amount"), (pl.col("amount") / 1000).alias("scheduled_transaction_amount"),
]) ])
.drop([ drop_scheduled_columns = fix_scheduled_values.drop([
"subtransactions", "deleted","flag_name","account_name", "id", "amount"
"payee_name","category_name","ingestion_date"
]) ])
)
except Exception as e: except Exception as e:
logging.error(f"Failed to transform the scheduled transactions DataFrame: {e}") logging.error(f"Failed to transform the scheduled transactions DataFrame: {e}")
return return
# Write the DataFrame to a new parquet file
logging.info("Writing the transformed scheduled transactions DataFrame to parquet file") logging.info("Writing the transformed scheduled transactions DataFrame to parquet file")
try: try:
scheduled_transactions_df.write_parquet(self.config['warehouse_data_path'] + '/scheduled_transactions.parquet') drop_scheduled_columns.write_parquet(self.config['warehouse_data_path'] + '/scheduled_transactions.parquet')
except Exception as e: except Exception as e:
logging.error(f"Failed to write the transformed scheduled transactions DataFrame: {e}") logging.error(f"Failed to write the transformed scheduled transactions DataFrame: {e}")