diff --git a/.gitignore b/.gitignore index 049d696..e39bb04 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ data/* .venv/* __pycache__/* */__pycache__/* +*.pbix \ No newline at end of file diff --git a/config.yaml b/config.yaml index 8d35b75..1f1b30f 100644 --- a/config.yaml +++ b/config.yaml @@ -19,4 +19,8 @@ primary_keys: transactions: unique_id: id scheduled_transactions: - unique_id: id \ No newline at end of file + unique_id: id +raw_data_path: data/raw +processed_data_path: data/processed +base_data_path: data/base +warehouse_data_path: data/warehouse \ No newline at end of file diff --git a/main.py b/main.py index 6448b39..5fb597d 100644 --- a/main.py +++ b/main.py @@ -5,6 +5,7 @@ import yaml from pipeline.ingest import Ingest from pipeline.raw_to_base import RawToBase +from pipeline.dimAccounts import DimAccounts dotenv.load_dotenv() @@ -19,4 +20,5 @@ config['API_TOKEN'] = API_TOKEN config['BUDGET_ID'] = BUDGET_ID Ingest(config) -RawToBase(config) \ No newline at end of file +RawToBase(config) +DimAccounts(config) \ No newline at end of file diff --git a/pipeline/dimAccounts.py b/pipeline/dimAccounts.py index fbbf42c..6ffb2a6 100644 --- a/pipeline/dimAccounts.py +++ b/pipeline/dimAccounts.py @@ -1,60 +1,41 @@ -# This file is used to define the dimension table for the accounts table -# The accounts table contains information about the accounts in the budget -# The accounts table has the following columns: -# - id: the unique identifier for the account -# - name: the name of the account -# - type: the type of the account (e.g. checking, savings, credit card) -# - on_budget: a boolean indicating whether the account is on budget -# - closed: a boolean indicating whether the account is closed -# - note: a note associated with the account -# - balance: the current balance of the account -# - cleared_balance: the cleared balance of the account -# - uncleared_balance: the uncleared balance of the account -# - deleted: a boolean indicating whether the account has been deleted +import polars as pl +class DimAccounts: + def __init__(self, config): + self.config = config + self.transform() + def transform(self): + file_path = self.config['base_data_path'] + '/accounts.parquet' + # Read the parquet file into a polars DataFrame + accounts_df = pl.read_parquet(file_path) -# the below is mega tbc + # Transform the DataFrame + accounts_df = ( + accounts_df + .with_columns([ + pl.col("id").alias("account_id"), + pl.col("name").alias("account_name"), + pl.col("type").alias("account_type"), + pl.col("on_budget").alias("on_budget"), + pl.col("closed").alias("closed"), + pl.col("note").alias("note"), + pl.col("balance").alias("balance"), + pl.col("cleared_balance").alias("cleared_balance"), + pl.col("uncleared_balance").alias("uncleared_balance"), + pl.col("deleted").alias("deleted"), + ]) + .with_columns([ + pl.col("note").fill_null("unknown"), + (pl.col("balance") / 100).alias("balance"), + (pl.col("cleared_balance") / 100).alias("cleared_balance"), + (pl.col("uncleared_balance") / 100).alias("uncleared_balance"), + ]) + .drop([ + "transfer_payee_id", "direct_import_linked", "direct_import_in_error", + "last_reconciled_at", "debt_original_balance", "debt_interest_rates", + "debt_minimum_payments", "debt_escrow_amounts", "ingestion_date" + ]) + ) + # Write the DataFrame to a new parquet file + accounts_df.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet') -import pandas as pd -from datetime import datetime - -def handle_scd_type_2(dim_accounts_df, new_data_df): - current_date = datetime.now().date() - - for index, new_row in new_data_df.iterrows(): - account_id = new_row['account_id'] - existing_rows = dim_accounts_df[dim_accounts_df['account_id'] == account_id] - - if existing_rows.empty: - # Insert new record - new_row['start_date'] = current_date - new_row['end_date'] = None - new_row['is_current'] = True - dim_accounts_df = dim_accounts_df.append(new_row, ignore_index=True) - else: - current_row = existing_rows[existing_rows['is_current'] == True].iloc[0] - if not new_row.equals(current_row.drop(['surrogate_key', 'start_date', 'end_date', 'is_current'])): - # Update existing record to set is_current to False and end_date - dim_accounts_df.loc[current_row.name, 'is_current'] = False - dim_accounts_df.loc[current_row.name, 'end_date'] = current_date - - # Insert new record - new_row['start_date'] = current_date - new_row['end_date'] = None - new_row['is_current'] = True - dim_accounts_df = dim_accounts_df.append(new_row, ignore_index=True) - - return dim_accounts_df - -# Example usage -dim_accounts_df = pd.DataFrame(columns=[ - 'surrogate_key', 'account_id', 'account_name', 'account_type', 'on_budget', 'closed', 'note', - 'balance', 'cleared_balance', 'uncleared_balance', 'deleted', 'start_date', 'end_date', 'is_current' -]) - -new_data_df = pd.DataFrame([ - {'account_id': 1, 'account_name': 'Checking Account', 'account_type': 'checking', 'on_budget': True, 'closed': False, 'note': '', 'balance': 1000.00, 'cleared_balance': 1000.00, 'uncleared_balance': 0.00, 'deleted': False}, - {'account_id': 2, 'account_name': 'Savings Account', 'account_type': 'savings', 'on_budget': True, 'closed': False, 'note': '', 'balance': 5000.00, 'cleared_balance': 5000.00, 'uncleared_balance': 0.00, 'deleted': False} -]) - -dim_accounts_df = handle_scd_type_2(dim_accounts_df, new_data_df) \ No newline at end of file diff --git a/pipeline/ingest.py b/pipeline/ingest.py index 6dc6e0a..e49e4bf 100644 --- a/pipeline/ingest.py +++ b/pipeline/ingest.py @@ -15,6 +15,7 @@ class Ingest: self.base_url = config['base_url'] self.knowledge_file = config['knowledge_file'] self.entities = config['entities'] + self.raw_data_path = config['raw_data_path'] self.headers = {'Authorization': f'Bearer {self.api_token}'} self.knowledge_cache = self.load_knowledge_cache() self.fetch_and_cache_entity_data() @@ -33,7 +34,7 @@ class Ingest: Save the data for a specific entity to a new cache file. """ current_time = time.strftime('%Y%m%d%H%M%S') - directory = f'data/raw/{entity}' + directory = os.path.join(self.raw_data_path, entity) if not os.path.exists(directory): os.makedirs(directory) entity_file = f'{directory}/{current_time}.json' diff --git a/pipeline/raw_to_base.py b/pipeline/raw_to_base.py index 5b3c6ec..a4e2c7a 100644 --- a/pipeline/raw_to_base.py +++ b/pipeline/raw_to_base.py @@ -9,9 +9,9 @@ class RawToBase: def __init__(self, config: Dict[str, Any]): self.entities = config['entities'] self.primary_keys = config['primary_keys'] - self.raw_data_path = 'data/raw' - self.processed_data_path = 'data/processed' - self.base_data_path = 'data/base' + self.raw_data_path = config['raw_data_path'] + self.processed_data_path = config['processed_data_path'] + self.base_data_path = config['base_data_path'] self.data = {} self.base_data = {} logging.basicConfig(level=logging.DEBUG) diff --git a/test.py b/test.py index 930a1b1..44f707c 100644 --- a/test.py +++ b/test.py @@ -1,15 +1,28 @@ import polars as pl -entities = ['accounts', 'categories', 'months', 'payees', 'transactions', 'scheduled_transactions'] +entities = ['accounts']#, 'categories', 'months', 'payees', 'transactions', 'scheduled_transactions'] for entity in entities: # print(f"Processing entity: {entity}") - file_path = f'data/base/{entity}.parquet' - # Read the parquet file into a polars DataFrame - entity_df = pl.read_parquet(file_path) - # Print the schema of the DataFrame - print(f"Schema of {entity} DataFrame:") - print(entity_df.schema) - # Display the first few rows of the DataFrame - # print(f"First few rows of {entity} DataFrame:") - # print(entity_df.head()) + file_path = f'data/base/{entity}.parquet' + # Read the parquet file into a polars DataFrame + entity_df = pl.read_parquet(file_path) + # Print the schema of the DataFrame + print(f"Schema of {entity} DataFrame:") + print(entity_df.schema) + # Display the first few rows of the DataFrame + print(f"First few rows of {entity} DataFrame:") + print(entity_df.head()) + +for entity in entities: + # print(f"Processing entity: {entity}") + file_path = f'data/warehouse/{entity}.parquet' + # Read the parquet file into a polars DataFrame + entity_df = pl.read_parquet(file_path) + # Print the schema of the DataFrame + print(f"Schema of {entity} DataFrame:") + print(entity_df.schema) + # Display the first few rows of the DataFrame + print(f"First few rows of {entity} DataFrame:") + with pl. + print(entity_df)