From bd0ebd38e9b0c6b25a25843c45ee0d11696ba960 Mon Sep 17 00:00:00 2001 From: Jake Pullen Date: Thu, 29 Aug 2024 10:39:55 +0100 Subject: [PATCH] tidying up facts and dims removing duplicated columns --- pipeline/dimensions.py | 6 +- pipeline/facts.py | 153 ++++++++++++++++++++++++----------------- 2 files changed, 91 insertions(+), 68 deletions(-) diff --git a/pipeline/dimensions.py b/pipeline/dimensions.py index ee7f9e4..efa1390 100644 --- a/pipeline/dimensions.py +++ b/pipeline/dimensions.py @@ -113,9 +113,9 @@ class DimCategories(Dimensions): pl.col('note').fill_null('none') ]) fix_categories_values = fill_null_category_values.with_columns([ - (pl.col('balance') / 100), - (pl.col('budgeted') / 100), - (pl.col('activity') / 100) + (pl.col('balance') / 1000), + (pl.col('budgeted') / 1000), + (pl.col('activity') / 1000) ]) drop_categories_columns = fix_categories_values.drop([ 'id', 'name' diff --git a/pipeline/facts.py b/pipeline/facts.py index 272ef11..1719824 100644 --- a/pipeline/facts.py +++ b/pipeline/facts.py @@ -18,60 +18,68 @@ class FactTransactions(Facts): self.transform() def transform(self): - # Read the parquet file into a polars DataFrame try: - transactions_df = pl.read_parquet(self.file_path) + source_transactions = pl.read_parquet(self.file_path) except FileNotFoundError: logging.error("The transactions DataFrame does not exist") return - - # Transform the DataFrame + + try: + base_transactions = source_transactions.select([ + "id", + "date", + "amount", + "memo", + "cleared", + "approved", + "flag_color", + "account_id", + "payee_id", + "category_id", + "transfer_account_id" + ]) + except Exception as e: + logging.error(f"Failed to select columns from the transactions DataFrame: {e}") + return + logging.info("Transforming the transactions DataFrame") try: - # Ensure the date column is in datetime format - transactions_df = transactions_df.with_columns([ + resolve_transaction_dates = base_transactions.with_columns([ pl.col("date").str.strptime(pl.Date, format="%Y-%m-%d").alias("date") ]) except Exception as e: logging.error(f"Failed to covert the date to date format: {e}") return - + try: - transactions_df = ( - transactions_df - .with_columns([ - pl.col("id").alias("transaction_id"), - (pl.col("date").dt.year().cast(pl.Utf8) + - pl.col("date").dt.month().cast(pl.Utf8).str.zfill(2) + - pl.col("date").dt.day().cast(pl.Utf8).str.zfill(2)).alias("transaction_date"), - pl.col("amount").alias("transaction_amount"), - pl.col("memo").alias("transaction_memo"), - pl.col("cleared").alias("transaction_cleared"), - pl.col("approved").alias("transaction_approved"), - pl.col("flag_color").alias("transaction_flag_color"), - pl.col("account_id").alias("account_id"), - pl.col("payee_id").alias("payee_id"), - pl.col("category_id").alias("category_id"), - pl.col("transfer_account_id").alias("transfer_account_id"), - ]) - .with_columns([ - pl.col("memo").fill_null("unknown"), - (pl.col("amount") / 1000).alias("transaction_amount"), - ]) - .drop([ - "transfer_transaction_id", "matched_transaction_id", "import_id", - "subtransactions", "deleted","flag_name","account_name", - "payee_name","category_name","import_payee_name","import_payee_name_original", - "debt_transaction_type","ingestion_date" - ]) - ) + add_transaction_prefix = resolve_transaction_dates.with_columns([ + pl.col("id").alias("transaction_id"), + (pl.col("date").dt.year().cast(pl.Utf8) + + pl.col("date").dt.month().cast(pl.Utf8).str.zfill(2) + + pl.col("date").dt.day().cast(pl.Utf8).str.zfill(2)).alias("transaction_date"), + ]) + fix_transaction_nulls = add_transaction_prefix.with_columns([ + pl.col("memo").fill_null("none"), + pl.col("flag_color").fill_null("none"), + pl.col("transfer_account_id").fill_null("none"), + pl.col("category_id").fill_null("none"), + ]) + fix_transaction_values = fix_transaction_nulls.with_columns([ + (pl.col("amount") / 1000).alias("transaction_amount") + ]) + drop_transaction_columns = fix_transaction_values.drop([ + "id", "date", "amount" + ]) + except Exception as e: logging.error(f"Failed to transform the transactions DataFrame: {e}") return # Write the DataFrame to a new parquet file logging.info("Writing the transformed transactions DataFrame to parquet file") try: - transactions_df.write_parquet(self.config['warehouse_data_path'] + '/transactions.parquet') + drop_transaction_columns.write_parquet( + self.config['warehouse_data_path'] + '/transactions.parquet' + ) except Exception as e: logging.error(f"Failed to write the transformed transactions DataFrame: {e}") @@ -82,46 +90,61 @@ class FactScheduledTransactions(Facts): self.transform() def transform(self): - # Read the parquet file into a polars DataFrame try: - scheduled_transactions_df = pl.read_parquet(self.file_path) + source_scheduled = pl.read_parquet(self.file_path) except FileNotFoundError: logging.error("The scheduled transactions DataFrame does not exist") return - # Transform the DataFrame + try: + base_scheduled = source_scheduled.select([ + "id", + "date_first", + "date_next", + "frequency", + "amount", + "memo", + "flag_color", + "account_id", + "payee_id", + "category_id", + "transfer_account_id" + ]) + except Exception as e: + logging.error(f"Failed to select columns from the scheduled transactions DataFrame: {e}") + return + + try: + resolve_scheduled_dates = base_scheduled.with_columns([ + pl.col("date_first").str.strptime(pl.Date, format="%Y-%m-%d").alias("date_first"), + pl.col("date_next").str.strptime(pl.Date, format="%Y-%m-%d").alias("date_next") + ]) + except Exception as e: + logging.error(f"Failed to covert the date to date format: {e}") + return + logging.info("Transforming the scheduled transactions DataFrame") try: - scheduled_transactions_df = ( - scheduled_transactions_df - .with_columns([ - pl.col("id").alias("scheduled_transaction_id"), - pl.col("date_first").alias("scheduled_transaction_first_date"), - pl.col("date_next").alias("scheduled_transaction_next_date"), - pl.col("frequency").alias("scheduled_transaction_frequency"), - pl.col("amount").alias("scheduled_transaction_amount"), - pl.col("memo").alias("scheduled_transaction_memo"), - pl.col("flag_color").alias("scheduled_transaction_flag_color"), - pl.col("account_id").alias("account_id"), - pl.col("payee_id").alias("payee_id"), - pl.col("category_id").alias("category_id"), - pl.col("transfer_account_id").alias("transfer_account_id"), - ]) - .with_columns([ - pl.col("memo").fill_null("unknown"), - (pl.col("amount") / 1000).alias("scheduled_transaction_amount"), - ]) - .drop([ - "subtransactions", "deleted","flag_name","account_name", - "payee_name","category_name","ingestion_date" - ]) - ) + add_scheduled_prefix = resolve_scheduled_dates.with_columns([ + pl.col("id").alias("scheduled_transaction_id") + ]) + fix_sheduled_nulls = add_scheduled_prefix.with_columns([ + pl.col("memo").fill_null("none"), + pl.col("flag_color").fill_null("none"), + pl.col("transfer_account_id").fill_null("none"), + pl.col("category_id").fill_null("none"), + ]) + fix_scheduled_values = fix_sheduled_nulls.with_columns([ + (pl.col("amount") / 1000).alias("scheduled_transaction_amount"), + ]) + drop_scheduled_columns = fix_scheduled_values.drop([ + "id", "amount" + ]) except Exception as e: logging.error(f"Failed to transform the scheduled transactions DataFrame: {e}") return - # Write the DataFrame to a new parquet file logging.info("Writing the transformed scheduled transactions DataFrame to parquet file") try: - scheduled_transactions_df.write_parquet(self.config['warehouse_data_path'] + '/scheduled_transactions.parquet') + drop_scheduled_columns.write_parquet(self.config['warehouse_data_path'] + '/scheduled_transactions.parquet') except Exception as e: logging.error(f"Failed to write the transformed scheduled transactions DataFrame: {e}")