Files
data_pipeline_for_YNAB/visuals/components.py
T
2025-05-30 11:43:47 +01:00

134 lines
4.6 KiB
Python

import polars as pl
import plotly.express as px
import pandas as pd
import logging
# import datetime
class data_components:
accounts = pl.read_parquet("data/warehouse/accounts.parquet")
categories = pl.read_parquet("data/warehouse/categories.parquet")
dates = pl.read_parquet("data/warehouse/dates.parquet")
payees = pl.read_parquet("data/warehouse/payees.parquet")
scheduled_transactions = pl.read_parquet(
"data/warehouse/scheduled_transactions.parquet"
)
transactions = pl.read_parquet("data/warehouse/transactions.parquet")
master_transactions = (
transactions.join(
categories,
left_on="category_id",
right_on="category_id",
suffix="_category",
)
.join(accounts, left_on="account_id", right_on="account_id", suffix="_account")
.join(payees, left_on="payee_id", right_on="payee_id", suffix="_payee")
.join(dates, left_on="transaction_date", right_on="date_id", suffix="_date")
)
def __init__(self):
logging.info("Initializing data components")
pass
def update_dates(start_date, end_date):
logging.info("Updating dates")
logging.debug(f"start_date: {start_date}, end_date: {end_date}")
logging.debug(data_components.master_transactions.columns)
try:
master_data = data_components.master_transactions.filter(
pl.col("date").is_between(start_date, end_date)
)
except Exception as e:
logging.error(f"Error updating dates: {e}")
raise e
return master_data
def update_data(master_data, callback=0):
# Create aggregations
spend_per_day = master_data.sql("""
SELECT
date,
year,
month,
day,
ABS(SUM(transaction_amount)) as total
FROM self
WHERE category_name != 'Inflow: Ready to Assign'
GROUP BY date, year, month, day
ORDER BY date DESC
""")
spend_per_category = master_data.sql("""
SELECT
category_name,
ABS(SUM(transaction_amount)) as total
FROM self
WHERE category_name != 'Inflow: Ready to Assign'
GROUP BY category_name
ORDER BY total DESC
""")
spend_per_payee = master_data.sql("""
SELECT
payee_name,
ABS(SUM(transaction_amount)) as total
FROM self
WHERE payee_name != 'Starting Balance'
AND transaction_amount < 0
GROUP BY payee_name
ORDER BY total DESC
""")
total_spend = master_data.sql("""
SELECT ABS(SUM(transaction_amount)) AS total
FROM self
WHERE payee_name != 'Starting Balance'
AND transaction_amount < 0
""").item()
# Convert DataFrame to list of dictionaries
spend_per_day_data = spend_per_day.to_dicts()
spend_per_category_data = spend_per_category.to_dicts()
spend_per_payee_data = spend_per_payee.to_dicts()
# Convert list of dictionaries to Pandas DataFrame
spend_per_day_df = pd.DataFrame(spend_per_day_data)
spend_per_category_df = pd.DataFrame(spend_per_category_data)
spend_per_payee_df = pd.DataFrame(spend_per_payee_data)
spend_per_day_line = px.line(spend_per_day_df, x="date", y="total")
spend_per_day_line.update_layout(
plot_bgcolor="black", paper_bgcolor="black", font_color="white"
)
spend_per_category_bar = px.bar(
spend_per_category_df, x="category_name", y="total"
)
spend_per_category_bar.update_layout(
plot_bgcolor="black", paper_bgcolor="black", font_color="white"
)
spend_per_payee_bar = px.bar(spend_per_payee_df, x="payee_name", y="total")
spend_per_payee_bar.update_layout(
plot_bgcolor="black", paper_bgcolor="black", font_color="white"
)
total_spend_line = f"### £{total_spend:,.2f}"
data = {
"spend_per_day_line": spend_per_day_line,
"spend_per_category_bar": spend_per_category_bar,
"spend_per_payee_bar": spend_per_payee_bar,
"total_spend": total_spend_line,
}
if callback == 0:
return data
else:
return (
spend_per_day_line,
spend_per_category_bar,
spend_per_payee_bar,
total_spend,
)