From 696b72988d8595901e50bccd275bb9848e68d003 Mon Sep 17 00:00:00 2001 From: Jake Pullen Date: Sat, 10 Aug 2024 08:07:14 +0100 Subject: [PATCH] added facts --- main.py | 3 ++ pipeline/facts.py | 103 ++++++++++++++++++++++++++++++++++++++++++++++ test.py | 2 +- 3 files changed, 107 insertions(+), 1 deletion(-) create mode 100644 pipeline/facts.py diff --git a/main.py b/main.py index 40fc585..8ebc54d 100644 --- a/main.py +++ b/main.py @@ -6,6 +6,7 @@ import yaml from pipeline.ingest import Ingest from pipeline.raw_to_base import RawToBase from pipeline.dimensions import DimAccounts, DimCategories, DimPayees, DimDate +from pipeline.facts import FactTransactions, FactScheduledTransactions dotenv.load_dotenv() @@ -26,3 +27,5 @@ if __name__ == '__main__': DimCategories(config) DimPayees(config) DimDate(config) + FactTransactions(config) + FactScheduledTransactions(config) diff --git a/pipeline/facts.py b/pipeline/facts.py new file mode 100644 index 0000000..666a3c3 --- /dev/null +++ b/pipeline/facts.py @@ -0,0 +1,103 @@ +import polars as pl +import logging +import os +from datetime import date + +class Facts: + def __init__(self, config): + self.config = config + self.base_file_path = self.config['base_data_path'] + os.makedirs(self.config['warehouse_data_path'], exist_ok=True) + + def get_full_file_path(self, file_name): + return f"{self.base_file_path}/{file_name}" + +class FactTransactions(Facts): + + def __init__(self, config): + super().__init__(config) + self.file_path = self.get_full_file_path('transactions.parquet') + self.transform() + + def transform(self): + # Read the parquet file into a polars DataFrame + transactions_df = pl.read_parquet(self.file_path) + + # Transform the DataFrame + logging.info("Transforming the transactions DataFrame") + transactions_df = ( + transactions_df + .with_columns([ + pl.col("id").alias("transaction_id"), + pl.col("date").alias("transaction_date"), + pl.col("amount").alias("transaction_amount"), + pl.col("memo").alias("transaction_memo"), + pl.col("cleared").alias("transaction_cleared"), + pl.col("approved").alias("transaction_approved"), + pl.col("flag_color").alias("transaction_flag_color"), + pl.col("account_id").alias("account_id"), + pl.col("payee_id").alias("payee_id"), + pl.col("category_id").alias("category_id"), + pl.col("transfer_account_id").alias("transfer_account_id"), + ]) + .with_columns([ + pl.col("memo").fill_null("unknown"), + (pl.col("amount") / 100).alias("transaction_amount"), + ]) + .drop([ + "transfer_transaction_id", "matched_transaction_id", "import_id", + "subtransactions", "deleted","flag_name","account_name", + "payee_name","category_name","import_payee_name","import_payee_name_original", + "debt_transaction_type","ingestion_date" + ]) + ) + + # Write the DataFrame to a new parquet file + logging.info("Writing the transformed transactions DataFrame to parquet file") + transactions_df.write_parquet(self.config['warehouse_data_path'] + '/transactions.parquet') + +class FactScheduledTransactions(Facts): + + def __init__(self, config): + super().__init__(config) + self.file_path = self.get_full_file_path('scheduled_transactions.parquet') + self.transform() + + def transform(self): + # Read the parquet file into a polars DataFrame + try: + scheduled_transactions_df = pl.read_parquet(self.file_path) + except FileNotFoundError: + logging.error("The scheduled transactions DataFrame does not exist") + return + + # Transform the DataFrame + logging.info("Transforming the scheduled transactions DataFrame") + scheduled_transactions_df = ( + scheduled_transactions_df + .with_columns([ + pl.col("id").alias("scheduled_transaction_id"), + pl.col("date").alias("scheduled_transaction_date"), + pl.col("amount").alias("scheduled_transaction_amount"), + pl.col("memo").alias("scheduled_transaction_memo"), + pl.col("flag_color").alias("scheduled_transaction_flag_color"), + pl.col("account_id").alias("account_id"), + pl.col("payee_id").alias("payee_id"), + pl.col("category_id").alias("category_id"), + pl.col("transfer_account_id").alias("transfer_account_id"), + ]) + .with_columns([ + pl.col("memo").fill_null("unknown"), + (pl.col("amount") / 100).alias("scheduled_transaction_amount"), + ]) + .drop([ + "transfer_transaction_id", "matched_transaction_id", "import_id", + "subtransactions", "deleted","flag_name","account_name", + "payee_name","category_name","import_payee_name","import_payee_name_original", + "debt_transaction_type","ingestion_date" + ]) + ) + + # Write the DataFrame to a new parquet file + logging.info("Writing the transformed scheduled transactions DataFrame to parquet file") + scheduled_transactions_df.write_parquet(self.config['warehouse_data_path'] + '/scheduled_transactions.parquet') diff --git a/test.py b/test.py index 59c1c7f..f0b8b60 100644 --- a/test.py +++ b/test.py @@ -1,7 +1,7 @@ import polars as pl #entities = ['accounts', 'categories', 'months', 'payees', 'transactions', 'scheduled_transactions'] -entities = ['payees'] +entities = ['transactions'] for entity in entities: