dimensions done, still need to fully test

This commit is contained in:
Jake Pullen
2024-08-09 16:27:36 +01:00
parent da5cf943d6
commit b7b6f8887c
4 changed files with 169 additions and 58 deletions
+11 -4
View File
@@ -5,7 +5,7 @@ import yaml
from pipeline.ingest import Ingest from pipeline.ingest import Ingest
from pipeline.raw_to_base import RawToBase from pipeline.raw_to_base import RawToBase
from pipeline.dimAccounts import DimAccounts from pipeline.dimensions import DimAccounts, DimCategories, DimPayees, DimDate
dotenv.load_dotenv() dotenv.load_dotenv()
@@ -19,6 +19,13 @@ with open('config.yaml', 'r') as file:
config['API_TOKEN'] = API_TOKEN config['API_TOKEN'] = API_TOKEN
config['BUDGET_ID'] = BUDGET_ID config['BUDGET_ID'] = BUDGET_ID
Ingest(config) if __name__ == '__main__':
RawToBase(config) #Ingest(config)
DimAccounts(config) #RawToBase(config)
#DimAccounts(config)
#DimCategories(config)
#DimPayees(config)
DimDate(config)
# dates_df = pl.DataFrame(pl.date_range('2020-01-01', '2030-12-31',"1d", eager=True)).alias('date')
-41
View File
@@ -1,41 +0,0 @@
import polars as pl
class DimAccounts:
def __init__(self, config):
self.config = config
self.transform()
def transform(self):
file_path = self.config['base_data_path'] + '/accounts.parquet'
# Read the parquet file into a polars DataFrame
accounts_df = pl.read_parquet(file_path)
# Transform the DataFrame
accounts_df = (
accounts_df
.with_columns([
pl.col("id").alias("account_id"),
pl.col("name").alias("account_name"),
pl.col("type").alias("account_type"),
pl.col("on_budget").alias("on_budget"),
pl.col("closed").alias("closed"),
pl.col("note").alias("note"),
pl.col("balance").alias("balance"),
pl.col("cleared_balance").alias("cleared_balance"),
pl.col("uncleared_balance").alias("uncleared_balance"),
pl.col("deleted").alias("deleted"),
])
.with_columns([
pl.col("note").fill_null("unknown"),
(pl.col("balance") / 100).alias("balance"),
(pl.col("cleared_balance") / 100).alias("cleared_balance"),
(pl.col("uncleared_balance") / 100).alias("uncleared_balance"),
])
.drop([
"transfer_payee_id", "direct_import_linked", "direct_import_in_error",
"last_reconciled_at", "debt_original_balance", "debt_interest_rates",
"debt_minimum_payments", "debt_escrow_amounts", "ingestion_date"
])
)
# Write the DataFrame to a new parquet file
accounts_df.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet')
+144
View File
@@ -0,0 +1,144 @@
import polars as pl
import logging
import os
from datetime import date
class Dimensions:
def __init__(self, config):
self.config = config
self.base_file_path = self.config['base_data_path']
os.makedirs(self.config['warehouse_data_path'], exist_ok=True)
def get_full_file_path(self, file_name):
return f"{self.base_file_path}/{file_name}"
class DimAccounts(Dimensions):
def __init__(self, config):
super().__init__(config)
self.file_path = self.get_full_file_path('accounts.parquet')
self.transform()
def transform(self):
# Read the parquet file into a polars DataFrame
accounts_df = pl.read_parquet(self.file_path)
# Transform the DataFrame
logging.info("Transforming the accounts DataFrame")
accounts_df = (
accounts_df
.with_columns([
pl.col("id").alias("account_id"),
pl.col("name").alias("account_name"),
pl.col("type").alias("account_type"),
pl.col("on_budget").alias("on_budget"),
pl.col("closed").alias("closed"),
pl.col("note").alias("note"),
pl.col("balance").alias("balance"),
pl.col("cleared_balance").alias("cleared_balance"),
pl.col("uncleared_balance").alias("uncleared_balance"),
pl.col("deleted").alias("deleted"),
])
.with_columns([
pl.col("note").fill_null("unknown"),
(pl.col("balance") / 100).alias("balance"),
(pl.col("cleared_balance") / 100).alias("cleared_balance"),
(pl.col("uncleared_balance") / 100).alias("uncleared_balance"),
])
.drop([
"transfer_payee_id", "direct_import_linked", "direct_import_in_error",
"last_reconciled_at", "debt_original_balance", "debt_interest_rates",
"debt_minimum_payments", "debt_escrow_amounts", "ingestion_date"
])
)
# Write the DataFrame to a new parquet file
logging.info("Writing the transformed accounts DataFrame to parquet file")
accounts_df.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet')
class DimCategories(Dimensions):
def __init__(self, config):
super().__init__(config)
self.file_path = self.get_full_file_path('categories.parquet')
self.transform()
def transform(self):
# Read the parquet file into a polars DataFrame
categories_df = pl.read_parquet(self.file_path)
logging.info("Transforming the categories DataFrame")
# Select the required columns
categories_df = categories_df.select([
'id',
'name',
'category_group_name',
'hidden',
'note',
'budgeted',
'activity',
'balance',
'deleted'
])
# Rename the columns
categories_df = categories_df.with_columns(pl.col('id').alias('category_id'))
categories_df = categories_df.with_columns(pl.col('name').alias('category_name'))
# Fill null values in the note column
categories_df = categories_df.with_columns(pl.col('note').fill_null('unknown'))
# Convert the balance, budgeted, and activity columns to decimal
categories_df = categories_df.with_columns(pl.col('balance') / 100)
categories_df = categories_df.with_columns(pl.col('budgeted') / 100)
categories_df = categories_df.with_columns(pl.col('activity') / 100)
# Write the DataFrame to a new parquet file
logging.info("Writing the transformed categories DataFrame to parquet file")
categories_df.write_parquet(self.config['warehouse_data_path'] + '/categories.parquet')
class DimPayees(Dimensions):
def __init__(self, config):
super().__init__(config)
self.file_path = self.get_full_file_path('payees.parquet')
self.transform()
def transform(self):
# Read the parquet file into a polars DataFrame
payees_df = pl.read_parquet(self.file_path)
logging.info("Transforming the payees DataFrame")
# Select the required columns
payees_df = payees_df.select([
'id',
'name',
'deleted'
])
# Rename the columns
payees_df = payees_df.with_columns(pl.col('id').alias('payee_id'))
payees_df = payees_df.with_columns(pl.col('name').alias('payee_name'))
# Write the DataFrame to a new parquet file
logging.info("Writing the transformed payees DataFrame to parquet file")
payees_df.write_parquet(self.config['warehouse_data_path'] + '/payees.parquet')
class DimDate(Dimensions):
def __init__(self, config):
super().__init__(config)
self.transform()
def transform(self):
# Create a DataFrame with dates from 2020-01-01 to 2030-12-31
dates_df = pl.DataFrame({'date':pl.date_range(date(2020, 1, 1), date(2030, 12, 31), "1d", eager=True)})
# Extract year, month, day, and weekday from the date column
dates_df = dates_df.with_columns([
pl.col('date').dt.year().alias('year'),
pl.col('date').dt.month().alias('month'),
pl.col('date').dt.day().alias('day'),
pl.col('date').dt.weekday().alias('weekday')
])
# Create a new column to indicate if the date is a weekday or weekend
dates_df = dates_df.with_columns([
(pl.col('weekday') < 5).alias('is_weekday') # True for weekdays (Monday to Friday), False for weekends (Saturday and Sunday)
])
# Write the DataFrame to a new parquet file
logging.info("Writing the transformed dates DataFrame to parquet file")
dates_df.write_parquet(self.config['warehouse_data_path'] + '/dates.parquet')
+14 -13
View File
@@ -1,6 +1,8 @@
import polars as pl import polars as pl
entities = ['accounts']#, 'categories', 'months', 'payees', 'transactions', 'scheduled_transactions'] #entities = ['accounts', 'categories', 'months', 'payees', 'transactions', 'scheduled_transactions']
entities = ['payees']
for entity in entities: for entity in entities:
# print(f"Processing entity: {entity}") # print(f"Processing entity: {entity}")
@@ -14,15 +16,14 @@ for entity in entities:
print(f"First few rows of {entity} DataFrame:") print(f"First few rows of {entity} DataFrame:")
print(entity_df.head()) print(entity_df.head())
for entity in entities: # for entity in entities:
# print(f"Processing entity: {entity}") # # print(f"Processing entity: {entity}")
file_path = f'data/warehouse/{entity}.parquet' # file_path = f'data/warehouse/{entity}.parquet'
# Read the parquet file into a polars DataFrame # # Read the parquet file into a polars DataFrame
entity_df = pl.read_parquet(file_path) # entity_df = pl.read_parquet(file_path)
# Print the schema of the DataFrame # # Print the schema of the DataFrame
print(f"Schema of {entity} DataFrame:") # print(f"Schema of {entity} DataFrame:")
print(entity_df.schema) # print(entity_df.schema)
# Display the first few rows of the DataFrame # # Display the first few rows of the DataFrame
print(f"First few rows of {entity} DataFrame:") # print(f"First few rows of {entity} DataFrame:")
with pl. # print(entity_df)
print(entity_df)