accounts into dimension

This commit is contained in:
Jake Pullen
2024-08-09 15:14:04 +01:00
parent 165464820b
commit da5cf943d6
7 changed files with 75 additions and 73 deletions
+1
View File
@@ -6,3 +6,4 @@ data/*
.venv/* .venv/*
__pycache__/* __pycache__/*
*/__pycache__/* */__pycache__/*
*.pbix
+4
View File
@@ -20,3 +20,7 @@ primary_keys:
unique_id: id unique_id: id
scheduled_transactions: scheduled_transactions:
unique_id: id unique_id: id
raw_data_path: data/raw
processed_data_path: data/processed
base_data_path: data/base
warehouse_data_path: data/warehouse
+2
View File
@@ -5,6 +5,7 @@ import yaml
from pipeline.ingest import Ingest from pipeline.ingest import Ingest
from pipeline.raw_to_base import RawToBase from pipeline.raw_to_base import RawToBase
from pipeline.dimAccounts import DimAccounts
dotenv.load_dotenv() dotenv.load_dotenv()
@@ -20,3 +21,4 @@ config['BUDGET_ID'] = BUDGET_ID
Ingest(config) Ingest(config)
RawToBase(config) RawToBase(config)
DimAccounts(config)
+36 -55
View File
@@ -1,60 +1,41 @@
# This file is used to define the dimension table for the accounts table import polars as pl
# The accounts table contains information about the accounts in the budget class DimAccounts:
# The accounts table has the following columns: def __init__(self, config):
# - id: the unique identifier for the account self.config = config
# - name: the name of the account self.transform()
# - type: the type of the account (e.g. checking, savings, credit card)
# - on_budget: a boolean indicating whether the account is on budget
# - closed: a boolean indicating whether the account is closed
# - note: a note associated with the account
# - balance: the current balance of the account
# - cleared_balance: the cleared balance of the account
# - uncleared_balance: the uncleared balance of the account
# - deleted: a boolean indicating whether the account has been deleted
def transform(self):
file_path = self.config['base_data_path'] + '/accounts.parquet'
# Read the parquet file into a polars DataFrame
accounts_df = pl.read_parquet(file_path)
# the below is mega tbc # Transform the DataFrame
accounts_df = (
import pandas as pd accounts_df
from datetime import datetime .with_columns([
pl.col("id").alias("account_id"),
def handle_scd_type_2(dim_accounts_df, new_data_df): pl.col("name").alias("account_name"),
current_date = datetime.now().date() pl.col("type").alias("account_type"),
pl.col("on_budget").alias("on_budget"),
for index, new_row in new_data_df.iterrows(): pl.col("closed").alias("closed"),
account_id = new_row['account_id'] pl.col("note").alias("note"),
existing_rows = dim_accounts_df[dim_accounts_df['account_id'] == account_id] pl.col("balance").alias("balance"),
pl.col("cleared_balance").alias("cleared_balance"),
if existing_rows.empty: pl.col("uncleared_balance").alias("uncleared_balance"),
# Insert new record pl.col("deleted").alias("deleted"),
new_row['start_date'] = current_date
new_row['end_date'] = None
new_row['is_current'] = True
dim_accounts_df = dim_accounts_df.append(new_row, ignore_index=True)
else:
current_row = existing_rows[existing_rows['is_current'] == True].iloc[0]
if not new_row.equals(current_row.drop(['surrogate_key', 'start_date', 'end_date', 'is_current'])):
# Update existing record to set is_current to False and end_date
dim_accounts_df.loc[current_row.name, 'is_current'] = False
dim_accounts_df.loc[current_row.name, 'end_date'] = current_date
# Insert new record
new_row['start_date'] = current_date
new_row['end_date'] = None
new_row['is_current'] = True
dim_accounts_df = dim_accounts_df.append(new_row, ignore_index=True)
return dim_accounts_df
# Example usage
dim_accounts_df = pd.DataFrame(columns=[
'surrogate_key', 'account_id', 'account_name', 'account_type', 'on_budget', 'closed', 'note',
'balance', 'cleared_balance', 'uncleared_balance', 'deleted', 'start_date', 'end_date', 'is_current'
]) ])
.with_columns([
new_data_df = pd.DataFrame([ pl.col("note").fill_null("unknown"),
{'account_id': 1, 'account_name': 'Checking Account', 'account_type': 'checking', 'on_budget': True, 'closed': False, 'note': '', 'balance': 1000.00, 'cleared_balance': 1000.00, 'uncleared_balance': 0.00, 'deleted': False}, (pl.col("balance") / 100).alias("balance"),
{'account_id': 2, 'account_name': 'Savings Account', 'account_type': 'savings', 'on_budget': True, 'closed': False, 'note': '', 'balance': 5000.00, 'cleared_balance': 5000.00, 'uncleared_balance': 0.00, 'deleted': False} (pl.col("cleared_balance") / 100).alias("cleared_balance"),
(pl.col("uncleared_balance") / 100).alias("uncleared_balance"),
]) ])
.drop([
"transfer_payee_id", "direct_import_linked", "direct_import_in_error",
"last_reconciled_at", "debt_original_balance", "debt_interest_rates",
"debt_minimum_payments", "debt_escrow_amounts", "ingestion_date"
])
)
# Write the DataFrame to a new parquet file
accounts_df.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet')
dim_accounts_df = handle_scd_type_2(dim_accounts_df, new_data_df)
+2 -1
View File
@@ -15,6 +15,7 @@ class Ingest:
self.base_url = config['base_url'] self.base_url = config['base_url']
self.knowledge_file = config['knowledge_file'] self.knowledge_file = config['knowledge_file']
self.entities = config['entities'] self.entities = config['entities']
self.raw_data_path = config['raw_data_path']
self.headers = {'Authorization': f'Bearer {self.api_token}'} self.headers = {'Authorization': f'Bearer {self.api_token}'}
self.knowledge_cache = self.load_knowledge_cache() self.knowledge_cache = self.load_knowledge_cache()
self.fetch_and_cache_entity_data() self.fetch_and_cache_entity_data()
@@ -33,7 +34,7 @@ class Ingest:
Save the data for a specific entity to a new cache file. Save the data for a specific entity to a new cache file.
""" """
current_time = time.strftime('%Y%m%d%H%M%S') current_time = time.strftime('%Y%m%d%H%M%S')
directory = f'data/raw/{entity}' directory = os.path.join(self.raw_data_path, entity)
if not os.path.exists(directory): if not os.path.exists(directory):
os.makedirs(directory) os.makedirs(directory)
entity_file = f'{directory}/{current_time}.json' entity_file = f'{directory}/{current_time}.json'
+3 -3
View File
@@ -9,9 +9,9 @@ class RawToBase:
def __init__(self, config: Dict[str, Any]): def __init__(self, config: Dict[str, Any]):
self.entities = config['entities'] self.entities = config['entities']
self.primary_keys = config['primary_keys'] self.primary_keys = config['primary_keys']
self.raw_data_path = 'data/raw' self.raw_data_path = config['raw_data_path']
self.processed_data_path = 'data/processed' self.processed_data_path = config['processed_data_path']
self.base_data_path = 'data/base' self.base_data_path = config['base_data_path']
self.data = {} self.data = {}
self.base_data = {} self.base_data = {}
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
+16 -3
View File
@@ -1,6 +1,6 @@
import polars as pl import polars as pl
entities = ['accounts', 'categories', 'months', 'payees', 'transactions', 'scheduled_transactions'] entities = ['accounts']#, 'categories', 'months', 'payees', 'transactions', 'scheduled_transactions']
for entity in entities: for entity in entities:
# print(f"Processing entity: {entity}") # print(f"Processing entity: {entity}")
@@ -11,5 +11,18 @@ for entity in entities:
print(f"Schema of {entity} DataFrame:") print(f"Schema of {entity} DataFrame:")
print(entity_df.schema) print(entity_df.schema)
# Display the first few rows of the DataFrame # Display the first few rows of the DataFrame
# print(f"First few rows of {entity} DataFrame:") print(f"First few rows of {entity} DataFrame:")
# print(entity_df.head()) print(entity_df.head())
for entity in entities:
# print(f"Processing entity: {entity}")
file_path = f'data/warehouse/{entity}.parquet'
# Read the parquet file into a polars DataFrame
entity_df = pl.read_parquet(file_path)
# Print the schema of the DataFrame
print(f"Schema of {entity} DataFrame:")
print(entity_df.schema)
# Display the first few rows of the DataFrame
print(f"First few rows of {entity} DataFrame:")
with pl.
print(entity_df)