tidied up dimensions and removed duplicated columns

This commit is contained in:
Jake Pullen
2024-08-29 09:23:30 +01:00
parent d999a8175c
commit 9e7ff808a5
3 changed files with 72 additions and 54 deletions
+4
View File
@@ -28,3 +28,7 @@ The Data Warehouse is the data after it has been aggregated and transformed. It
## Processed Archive ## Processed Archive
The Processed Archive is the data after it has been processed and stored in the base tables. It is the raw json files in the `data/processed/` directory with a folder for each entity and file for each load that has been processed. The Processed Archive is the data after it has been processed and stored in the base tables. It is the raw json files in the `data/processed/` directory with a folder for each entity and file for each load that has been processed.
## Visualisation datasets
When preparing the data for visualisation, we create dataframes in memory that are used to create the visualisations. These are not stored on disk.
+2 -2
View File
@@ -8,7 +8,7 @@ import logging.config
import logging.handlers import logging.handlers
import config.exit_codes as ec import config.exit_codes as ec
from dash_app import app #from dash_app import app
from pipeline.pipeline_main import pipeline_main from pipeline.pipeline_main import pipeline_main
def set_up_logging(): def set_up_logging():
@@ -58,7 +58,7 @@ config['BUDGET_ID'] = BUDGET_ID
if __name__ == '__main__': if __name__ == '__main__':
try: try:
pipeline_main(config) pipeline_main(config)
app.run() #debug=True) # app.run() #debug=True)
except SystemExit as e: except SystemExit as e:
exit_code = e.code exit_code = e.code
if exit_code == ec.SUCCESS: if exit_code == ec.SUCCESS:
+66 -52
View File
@@ -22,47 +22,55 @@ class DimAccounts(Dimensions):
def transform(self): def transform(self):
# Read the parquet file into a polars DataFrame # Read the parquet file into a polars DataFrame
try: try:
accounts_df = pl.read_parquet(self.file_path) source_accounts = pl.read_parquet(self.file_path)
except Exception as e: except Exception as e:
logging.error(f"Failed to read the base accounts parquet file: {e}") logging.error(f"Failed to read the base accounts parquet file: {e}")
return return
# Transform the DataFrame
logging.info("Transforming the accounts DataFrame") logging.info("Transforming the accounts DataFrame")
try: try:
accounts_df = ( base_accounts = (
accounts_df source_accounts.select([
.with_columns([ "id",
pl.col("id").alias("account_id"), "name",
pl.col("name").alias("account_name"), "type",
pl.col("type").alias("account_type"), "on_budget",
pl.col("on_budget").alias("on_budget"), "closed",
pl.col("closed").alias("closed"), "note",
pl.col("note").alias("note"), "balance",
pl.col("balance").alias("balance"), "cleared_balance",
pl.col("cleared_balance").alias("cleared_balance"), "uncleared_balance",
pl.col("uncleared_balance").alias("uncleared_balance"), "deleted"
pl.col("deleted").alias("deleted"),
])
.with_columns([
pl.col("note").fill_null("unknown"),
(pl.col("balance") / 100).alias("balance"),
(pl.col("cleared_balance") / 100).alias("cleared_balance"),
(pl.col("uncleared_balance") / 100).alias("uncleared_balance"),
])
.drop([
"transfer_payee_id", "direct_import_linked", "direct_import_in_error",
"last_reconciled_at", "debt_original_balance", "debt_interest_rates",
"debt_minimum_payments", "debt_escrow_amounts", "ingestion_date"
]) ])
) )
except Exception as e:
logging.error(f"Failed to select columns from the categories DataFrame: {e}")
return
try:
add_accounts_prefix = base_accounts.with_columns([
pl.col("id").alias("account_id"),
pl.col("name").alias("account_name"),
pl.col("type").alias("account_type")
])
fill_accounts_null_values = add_accounts_prefix.with_columns([
pl.col('note').fill_null('none')
])
fix_accounts_values = fill_accounts_null_values.with_columns([
(pl.col("balance") / 1000).alias("balance"),
(pl.col("cleared_balance") / 1000).alias("cleared_balance"),
(pl.col("uncleared_balance") / 1000).alias("uncleared_balance"),
])
drop_accounts_columns = fix_accounts_values.drop([
"id", "name", "type"
])
except Exception as e: except Exception as e:
logging.error(f"Failed to transform the accounts DataFrame: {e}") logging.error(f"Failed to transform the accounts DataFrame: {e}")
return return
# Write the DataFrame to a new parquet file
logging.info("Writing the transformed accounts DataFrame to parquet file") logging.info("Writing the transformed accounts DataFrame to parquet file")
try: try:
accounts_df.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet') drop_accounts_columns.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet')
except Exception as e: except Exception as e:
logging.error(f"Failed to write the transformed accounts DataFrame to parquet file: {e}") logging.error(f"Failed to write the transformed accounts DataFrame to parquet file: {e}")
return return
@@ -74,15 +82,14 @@ class DimCategories(Dimensions):
self.transform() self.transform()
def transform(self): def transform(self):
# Read the parquet file into a polars DataFrame
try: try:
categories_df = pl.read_parquet(self.file_path) source_categories = pl.read_parquet(self.file_path)
except Exception as e: except Exception as e:
logging.error(f"Failed to read the base categories parquet file: {e}") logging.error(f"Failed to read the base categories parquet file: {e}")
return return
logging.info("Transforming the categories DataFrame") logging.info("Transforming the categories DataFrame")
try: try:
categories_df = categories_df.select([ base_categories = source_categories.select([
'id', 'id',
'name', 'name',
'category_group_name', 'category_group_name',
@@ -98,29 +105,32 @@ class DimCategories(Dimensions):
return return
try: try:
# Rename the columns add_categories_prefix = base_categories.with_columns([
categories_df = categories_df.with_columns(pl.col('id').alias('category_id')) pl.col('id').alias('category_id'),
categories_df = categories_df.with_columns(pl.col('name').alias('category_name')) pl.col('name').alias('category_name')
])
# Fill null values in the note column fill_null_category_values = add_categories_prefix.with_columns([
categories_df = categories_df.with_columns(pl.col('note').fill_null('unknown')) pl.col('note').fill_null('none')
])
# Convert the balance, budgeted, and activity columns to decimal fix_categories_values = fill_null_category_values.with_columns([
categories_df = categories_df.with_columns(pl.col('balance') / 100) (pl.col('balance') / 100),
categories_df = categories_df.with_columns(pl.col('budgeted') / 100) (pl.col('budgeted') / 100),
categories_df = categories_df.with_columns(pl.col('activity') / 100) (pl.col('activity') / 100)
])
drop_categories_columns = fix_categories_values.drop([
'id', 'name'
])
except Exception as e: except Exception as e:
logging.error(f"Failed to transform the categories DataFrame: {e}") logging.error(f"Failed to transform the categories DataFrame: {e}")
return return
# Write the DataFrame to a new parquet file
logging.info("Writing the transformed categories DataFrame to parquet file") logging.info("Writing the transformed categories DataFrame to parquet file")
try: try:
categories_df.write_parquet(self.config['warehouse_data_path'] + '/categories.parquet') drop_categories_columns.write_parquet(self.config['warehouse_data_path'] + '/categories.parquet')
except Exception as e: except Exception as e:
logging.error(f"Failed to write the transformed categories DataFrame to parquet file: {e}") logging.error(f"Failed to write the transformed categories DataFrame to parquet file: {e}")
return return
class DimPayees(Dimensions): class DimPayees(Dimensions):
def __init__(self, config): def __init__(self, config):
super().__init__(config) super().__init__(config)
@@ -128,15 +138,14 @@ class DimPayees(Dimensions):
self.transform() self.transform()
def transform(self): def transform(self):
# Read the parquet file into a polars DataFrame
try: try:
payees_df = pl.read_parquet(self.file_path) source_payees = pl.read_parquet(self.file_path)
except Exception as e: except Exception as e:
logging.error(f"Failed to read the base payees parquet file: {e}") logging.error(f"Failed to read the base payees parquet file: {e}")
return return
logging.info("Transforming the payees DataFrame") logging.info("Transforming the payees DataFrame")
try: try:
payees_df = payees_df.select([ base_payees = source_payees.select([
'id', 'id',
'name', 'name',
'deleted' 'deleted'
@@ -144,10 +153,15 @@ class DimPayees(Dimensions):
except Exception as e: except Exception as e:
logging.error(f"Failed to select columns from the payees DataFrame: {e}") logging.error(f"Failed to select columns from the payees DataFrame: {e}")
return return
try: try:
# Rename the columns add_payees_prefix = base_payees.with_columns([
payees_df = payees_df.with_columns(pl.col('id').alias('payee_id')) pl.col('id').alias('payee_id'),
payees_df = payees_df.with_columns(pl.col('name').alias('payee_name')) pl.col('name').alias('payee_name')
])
drop_payees_columns = add_payees_prefix.drop([
'id', 'name'
])
except Exception as e: except Exception as e:
logging.error(f"Failed to rename columns in the payees DataFrame: {e}") logging.error(f"Failed to rename columns in the payees DataFrame: {e}")
return return
@@ -155,7 +169,7 @@ class DimPayees(Dimensions):
# Write the DataFrame to a new parquet file # Write the DataFrame to a new parquet file
logging.info("Writing the transformed payees DataFrame to parquet file") logging.info("Writing the transformed payees DataFrame to parquet file")
try: try:
payees_df.write_parquet(self.config['warehouse_data_path'] + '/payees.parquet') drop_payees_columns.write_parquet(self.config['warehouse_data_path'] + '/payees.parquet')
except Exception as e: except Exception as e:
logging.error(f"Failed to write the transformed payees DataFrame to parquet file: {e}") logging.error(f"Failed to write the transformed payees DataFrame to parquet file: {e}")
return return