From b7b6f8887c954a8282c82ef4eacd3abfa213e467 Mon Sep 17 00:00:00 2001 From: Jake Pullen Date: Fri, 9 Aug 2024 16:27:36 +0100 Subject: [PATCH] dimensions done, still need to fully test --- main.py | 15 +++-- pipeline/dimAccounts.py | 41 ------------ pipeline/dimensions.py | 144 ++++++++++++++++++++++++++++++++++++++++ test.py | 27 ++++---- 4 files changed, 169 insertions(+), 58 deletions(-) delete mode 100644 pipeline/dimAccounts.py create mode 100644 pipeline/dimensions.py diff --git a/main.py b/main.py index 5fb597d..024c31c 100644 --- a/main.py +++ b/main.py @@ -5,7 +5,7 @@ import yaml from pipeline.ingest import Ingest from pipeline.raw_to_base import RawToBase -from pipeline.dimAccounts import DimAccounts +from pipeline.dimensions import DimAccounts, DimCategories, DimPayees, DimDate dotenv.load_dotenv() @@ -19,6 +19,13 @@ with open('config.yaml', 'r') as file: config['API_TOKEN'] = API_TOKEN config['BUDGET_ID'] = BUDGET_ID -Ingest(config) -RawToBase(config) -DimAccounts(config) \ No newline at end of file +if __name__ == '__main__': + #Ingest(config) + #RawToBase(config) + #DimAccounts(config) + #DimCategories(config) + #DimPayees(config) + DimDate(config) + + +# dates_df = pl.DataFrame(pl.date_range('2020-01-01', '2030-12-31',"1d", eager=True)).alias('date') \ No newline at end of file diff --git a/pipeline/dimAccounts.py b/pipeline/dimAccounts.py deleted file mode 100644 index 6ffb2a6..0000000 --- a/pipeline/dimAccounts.py +++ /dev/null @@ -1,41 +0,0 @@ -import polars as pl -class DimAccounts: - def __init__(self, config): - self.config = config - self.transform() - - def transform(self): - file_path = self.config['base_data_path'] + '/accounts.parquet' - # Read the parquet file into a polars DataFrame - accounts_df = pl.read_parquet(file_path) - - # Transform the DataFrame - accounts_df = ( - accounts_df - .with_columns([ - pl.col("id").alias("account_id"), - pl.col("name").alias("account_name"), - pl.col("type").alias("account_type"), - pl.col("on_budget").alias("on_budget"), - pl.col("closed").alias("closed"), - pl.col("note").alias("note"), - pl.col("balance").alias("balance"), - pl.col("cleared_balance").alias("cleared_balance"), - pl.col("uncleared_balance").alias("uncleared_balance"), - pl.col("deleted").alias("deleted"), - ]) - .with_columns([ - pl.col("note").fill_null("unknown"), - (pl.col("balance") / 100).alias("balance"), - (pl.col("cleared_balance") / 100).alias("cleared_balance"), - (pl.col("uncleared_balance") / 100).alias("uncleared_balance"), - ]) - .drop([ - "transfer_payee_id", "direct_import_linked", "direct_import_in_error", - "last_reconciled_at", "debt_original_balance", "debt_interest_rates", - "debt_minimum_payments", "debt_escrow_amounts", "ingestion_date" - ]) - ) - # Write the DataFrame to a new parquet file - accounts_df.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet') - diff --git a/pipeline/dimensions.py b/pipeline/dimensions.py new file mode 100644 index 0000000..5753ee8 --- /dev/null +++ b/pipeline/dimensions.py @@ -0,0 +1,144 @@ +import polars as pl +import logging +import os +from datetime import date +class Dimensions: + def __init__(self, config): + self.config = config + self.base_file_path = self.config['base_data_path'] + os.makedirs(self.config['warehouse_data_path'], exist_ok=True) + + def get_full_file_path(self, file_name): + return f"{self.base_file_path}/{file_name}" + + +class DimAccounts(Dimensions): + def __init__(self, config): + super().__init__(config) + self.file_path = self.get_full_file_path('accounts.parquet') + self.transform() + + def transform(self): + # Read the parquet file into a polars DataFrame + accounts_df = pl.read_parquet(self.file_path) + + # Transform the DataFrame + logging.info("Transforming the accounts DataFrame") + accounts_df = ( + accounts_df + .with_columns([ + pl.col("id").alias("account_id"), + pl.col("name").alias("account_name"), + pl.col("type").alias("account_type"), + pl.col("on_budget").alias("on_budget"), + pl.col("closed").alias("closed"), + pl.col("note").alias("note"), + pl.col("balance").alias("balance"), + pl.col("cleared_balance").alias("cleared_balance"), + pl.col("uncleared_balance").alias("uncleared_balance"), + pl.col("deleted").alias("deleted"), + ]) + .with_columns([ + pl.col("note").fill_null("unknown"), + (pl.col("balance") / 100).alias("balance"), + (pl.col("cleared_balance") / 100).alias("cleared_balance"), + (pl.col("uncleared_balance") / 100).alias("uncleared_balance"), + ]) + .drop([ + "transfer_payee_id", "direct_import_linked", "direct_import_in_error", + "last_reconciled_at", "debt_original_balance", "debt_interest_rates", + "debt_minimum_payments", "debt_escrow_amounts", "ingestion_date" + ]) + ) + # Write the DataFrame to a new parquet file + logging.info("Writing the transformed accounts DataFrame to parquet file") + accounts_df.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet') + + +class DimCategories(Dimensions): + def __init__(self, config): + super().__init__(config) + self.file_path = self.get_full_file_path('categories.parquet') + self.transform() + + def transform(self): + # Read the parquet file into a polars DataFrame + categories_df = pl.read_parquet(self.file_path) + logging.info("Transforming the categories DataFrame") + # Select the required columns + categories_df = categories_df.select([ + 'id', + 'name', + 'category_group_name', + 'hidden', + 'note', + 'budgeted', + 'activity', + 'balance', + 'deleted' + ]) + # Rename the columns + categories_df = categories_df.with_columns(pl.col('id').alias('category_id')) + categories_df = categories_df.with_columns(pl.col('name').alias('category_name')) + + # Fill null values in the note column + categories_df = categories_df.with_columns(pl.col('note').fill_null('unknown')) + + # Convert the balance, budgeted, and activity columns to decimal + categories_df = categories_df.with_columns(pl.col('balance') / 100) + categories_df = categories_df.with_columns(pl.col('budgeted') / 100) + categories_df = categories_df.with_columns(pl.col('activity') / 100) + + # Write the DataFrame to a new parquet file + logging.info("Writing the transformed categories DataFrame to parquet file") + categories_df.write_parquet(self.config['warehouse_data_path'] + '/categories.parquet') + +class DimPayees(Dimensions): + def __init__(self, config): + super().__init__(config) + self.file_path = self.get_full_file_path('payees.parquet') + self.transform() + + def transform(self): + # Read the parquet file into a polars DataFrame + payees_df = pl.read_parquet(self.file_path) + logging.info("Transforming the payees DataFrame") + # Select the required columns + payees_df = payees_df.select([ + 'id', + 'name', + 'deleted' + ]) + # Rename the columns + payees_df = payees_df.with_columns(pl.col('id').alias('payee_id')) + payees_df = payees_df.with_columns(pl.col('name').alias('payee_name')) + + # Write the DataFrame to a new parquet file + logging.info("Writing the transformed payees DataFrame to parquet file") + payees_df.write_parquet(self.config['warehouse_data_path'] + '/payees.parquet') + + +class DimDate(Dimensions): + def __init__(self, config): + super().__init__(config) + self.transform() + + def transform(self): + # Create a DataFrame with dates from 2020-01-01 to 2030-12-31 + dates_df = pl.DataFrame({'date':pl.date_range(date(2020, 1, 1), date(2030, 12, 31), "1d", eager=True)}) + + # Extract year, month, day, and weekday from the date column + dates_df = dates_df.with_columns([ + pl.col('date').dt.year().alias('year'), + pl.col('date').dt.month().alias('month'), + pl.col('date').dt.day().alias('day'), + pl.col('date').dt.weekday().alias('weekday') + ]) + # Create a new column to indicate if the date is a weekday or weekend + dates_df = dates_df.with_columns([ + (pl.col('weekday') < 5).alias('is_weekday') # True for weekdays (Monday to Friday), False for weekends (Saturday and Sunday) + ]) + # Write the DataFrame to a new parquet file + logging.info("Writing the transformed dates DataFrame to parquet file") + dates_df.write_parquet(self.config['warehouse_data_path'] + '/dates.parquet') + diff --git a/test.py b/test.py index 44f707c..59c1c7f 100644 --- a/test.py +++ b/test.py @@ -1,6 +1,8 @@ import polars as pl -entities = ['accounts']#, 'categories', 'months', 'payees', 'transactions', 'scheduled_transactions'] +#entities = ['accounts', 'categories', 'months', 'payees', 'transactions', 'scheduled_transactions'] +entities = ['payees'] + for entity in entities: # print(f"Processing entity: {entity}") @@ -14,15 +16,14 @@ for entity in entities: print(f"First few rows of {entity} DataFrame:") print(entity_df.head()) -for entity in entities: - # print(f"Processing entity: {entity}") - file_path = f'data/warehouse/{entity}.parquet' - # Read the parquet file into a polars DataFrame - entity_df = pl.read_parquet(file_path) - # Print the schema of the DataFrame - print(f"Schema of {entity} DataFrame:") - print(entity_df.schema) - # Display the first few rows of the DataFrame - print(f"First few rows of {entity} DataFrame:") - with pl. - print(entity_df) +# for entity in entities: +# # print(f"Processing entity: {entity}") +# file_path = f'data/warehouse/{entity}.parquet' +# # Read the parquet file into a polars DataFrame +# entity_df = pl.read_parquet(file_path) +# # Print the schema of the DataFrame +# print(f"Schema of {entity} DataFrame:") +# print(entity_df.schema) +# # Display the first few rows of the DataFrame +# print(f"First few rows of {entity} DataFrame:") +# print(entity_df)