Merge pull request #8 from Jake-Pullen/feature/add_visuals

Feature/add visuals
This commit is contained in:
Jake-Pullen
2024-08-28 15:26:01 +01:00
committed by GitHub
7 changed files with 226 additions and 37 deletions
+150
View File
@@ -0,0 +1,150 @@
'''Module to create a Dash app that displays visualizations of YNAB data.'''
import polars as pl
import plotly.express as px
from dash import Dash, html, dcc
import dash_bootstrap_components as dbc
import pandas as pd
accounts = pl.read_parquet('data/warehouse/accounts.parquet')
categories = pl.read_parquet('data/warehouse/categories.parquet')
dates = pl.read_parquet('data/warehouse/dates.parquet')
payees = pl.read_parquet('data/warehouse/payees.parquet')
scheduled_transactions = pl.read_parquet('data/warehouse/scheduled_transactions.parquet')
transactions = pl.read_parquet('data/warehouse/transactions.parquet')
# Join transactions with accounts, categories, and payees to create a master DataFrame
master_df = transactions.join(categories, left_on='category_id', right_on='id', suffix='_category')\
.join(accounts, left_on='account_id', right_on='id', suffix='_account')\
.join(payees, left_on='payee_id', right_on='id', suffix='_payee')\
.join(dates, left_on='transaction_date', right_on='date_id', suffix='_date')\
# Create aggregations
spend_per_day = master_df.sql('''
SELECT
date,
year,
month,
day,
ABS(SUM(transaction_amount)) as total
FROM self
WHERE category_name != 'Inflow: Ready to Assign'
GROUP BY date, year, month, day
ORDER BY date DESC
'''
)
spend_per_category = master_df.sql('''
SELECT
category_name,
ABS(SUM(transaction_amount)) as total
FROM self
WHERE category_name != 'Inflow: Ready to Assign'
GROUP BY category_name
ORDER BY total DESC
'''
)
spend_per_payee = master_df.sql('''
SELECT
payee_name,
ABS(SUM(transaction_amount)) as total
FROM self
WHERE payee_name != 'Starting Balance'
AND transaction_amount < 0
GROUP BY payee_name
ORDER BY total DESC
'''
)
# Convert DataFrame to list of dictionaries
spend_per_day_data = spend_per_day.to_dicts()
spend_per_category_data = spend_per_category.to_dicts()
spend_per_payee_data = spend_per_payee.to_dicts()
# Convert list of dictionaries to Pandas DataFrame
spend_per_day_df = pd.DataFrame(spend_per_day_data)
spend_per_category_df = pd.DataFrame(spend_per_category_data)
spend_per_payee_df = pd.DataFrame(spend_per_payee_data)
spend_per_day_line = px.line(spend_per_day_df, x="date", y="total")
spend_per_day_line.update_layout(
plot_bgcolor='black',
paper_bgcolor='black',
font_color='white'
)
spend_per_category_bar = px.bar(spend_per_category_df, x="category_name", y="total")
spend_per_category_bar.update_layout(
plot_bgcolor='black',
paper_bgcolor='black',
font_color='white'
)
spend_per_payee_bar = px.bar(spend_per_payee_df, x="payee_name", y="total")
spend_per_payee_bar.update_layout(
plot_bgcolor='black',
paper_bgcolor='black',
font_color='white'
)
# Initialize the app with a dark theme
app = Dash(external_stylesheets=[dbc.themes.DARKLY])
# App layout
app.layout = dbc.Container(
[
dbc.Row(
dbc.Col(
html.Div("Data Pipeline For YNAB, Preview Visualisations",
className="text-center text-light"),
width=12
)
),
dbc.Row(
[
dbc.Col(
dbc.Card(
dbc.CardBody(
[
html.H4("Spend Per Day", className="card-title"),
dcc.Graph(figure=spend_per_day_line)
]
),
className="mb-4"
),
width=12
)
]
),
dbc.Row(
[
dbc.Col(
dbc.Card(
dbc.CardBody(
[
html.H4("Spend Per Category", className="card-title"),
dcc.Graph(figure=spend_per_category_bar)
]
),
className="mb-4"
),
width=6
),
dbc.Col(
dbc.Card(
dbc.CardBody(
[
html.H4("Spend Per Payee", className="card-title"),
dcc.Graph(figure=spend_per_payee_bar)
]
),
className="mb-4"
),
width=6
)
]
)
],
fluid=True
)
+6 -19
View File
@@ -8,10 +8,8 @@ import logging.config
import logging.handlers import logging.handlers
import config.exit_codes as ec import config.exit_codes as ec
from pipeline.ingest import Ingest from dash_app import app
from pipeline.raw_to_base import RawToBase from pipeline.pipeline_main import pipeline_main
from pipeline.dimensions import DimAccounts, DimCategories, DimPayees, DimDate
from pipeline.facts import FactTransactions, FactScheduledTransactions
def set_up_logging(): def set_up_logging():
try: try:
@@ -37,7 +35,7 @@ dotenv.load_dotenv()
API_TOKEN = os.getenv('API_TOKEN') API_TOKEN = os.getenv('API_TOKEN')
BUDGET_ID = os.getenv('BUDGET_ID') BUDGET_ID = os.getenv('BUDGET_ID')
def main():
if not API_TOKEN or not BUDGET_ID: if not API_TOKEN or not BUDGET_ID:
logging.error('API_TOKEN or BUDGET_ID is not set in .env file') logging.error('API_TOKEN or BUDGET_ID is not set in .env file')
sys.exit(ec.MISSING_ENV_VARS) sys.exit(ec.MISSING_ENV_VARS)
@@ -55,23 +53,12 @@ def main():
config['API_TOKEN'] = API_TOKEN config['API_TOKEN'] = API_TOKEN
config['BUDGET_ID'] = BUDGET_ID config['BUDGET_ID'] = BUDGET_ID
logging.info('Starting data pipeline') #sys.exit(ec.SUCCESS)
Ingest(config)
RawToBase(config)
DimAccounts(config)
DimCategories(config)
DimPayees(config)
DimDate(config)
FactTransactions(config)
FactScheduledTransactions(config)
logging.info('Data pipeline completed successfully')
sys.exit(ec.SUCCESS)
if __name__ == '__main__': if __name__ == '__main__':
try: try:
main() pipeline_main(config)
app.run() #debug=True)
except SystemExit as e: except SystemExit as e:
exit_code = e.code exit_code = e.code
if exit_code == ec.SUCCESS: if exit_code == ec.SUCCESS:
+12
View File
@@ -191,6 +191,18 @@ class DimDate(Dimensions):
except Exception as e: except Exception as e:
logging.error(f"Failed to create a new column to indicate if the date is a weekday or weekend: {e}") logging.error(f"Failed to create a new column to indicate if the date is a weekday or weekend: {e}")
return return
# Create a primary key by concatenating year, month, and day with no separators
try:
dates_df = dates_df.with_columns([
(pl.col('year').cast(pl.Utf8) +
pl.col('month').cast(pl.Utf8).str.zfill(2) +
pl.col('day').cast(pl.Utf8).str.zfill(2)
).alias('date_id')
])
except Exception as e:
logging.error(f"Failed to create the primary key column: {e}")
return
# Write the DataFrame to a new parquet file # Write the DataFrame to a new parquet file
logging.info("Writing the transformed dates DataFrame to parquet file") logging.info("Writing the transformed dates DataFrame to parquet file")
try: try:
+14 -3
View File
@@ -27,12 +27,23 @@ class FactTransactions(Facts):
# Transform the DataFrame # Transform the DataFrame
logging.info("Transforming the transactions DataFrame") logging.info("Transforming the transactions DataFrame")
try:
# Ensure the date column is in datetime format
transactions_df = transactions_df.with_columns([
pl.col("date").str.strptime(pl.Date, format="%Y-%m-%d").alias("date")
])
except Exception as e:
logging.error(f"Failed to covert the date to date format: {e}")
return
try: try:
transactions_df = ( transactions_df = (
transactions_df transactions_df
.with_columns([ .with_columns([
pl.col("id").alias("transaction_id"), pl.col("id").alias("transaction_id"),
pl.col("date").alias("transaction_date"), (pl.col("date").dt.year().cast(pl.Utf8) +
pl.col("date").dt.month().cast(pl.Utf8).str.zfill(2) +
pl.col("date").dt.day().cast(pl.Utf8).str.zfill(2)).alias("transaction_date"),
pl.col("amount").alias("transaction_amount"), pl.col("amount").alias("transaction_amount"),
pl.col("memo").alias("transaction_memo"), pl.col("memo").alias("transaction_memo"),
pl.col("cleared").alias("transaction_cleared"), pl.col("cleared").alias("transaction_cleared"),
@@ -45,7 +56,7 @@ class FactTransactions(Facts):
]) ])
.with_columns([ .with_columns([
pl.col("memo").fill_null("unknown"), pl.col("memo").fill_null("unknown"),
(pl.col("amount") / 100).alias("transaction_amount"), (pl.col("amount") / 1000).alias("transaction_amount"),
]) ])
.drop([ .drop([
"transfer_transaction_id", "matched_transaction_id", "import_id", "transfer_transaction_id", "matched_transaction_id", "import_id",
@@ -98,7 +109,7 @@ class FactScheduledTransactions(Facts):
]) ])
.with_columns([ .with_columns([
pl.col("memo").fill_null("unknown"), pl.col("memo").fill_null("unknown"),
(pl.col("amount") / 100).alias("scheduled_transaction_amount"), (pl.col("amount") / 1000).alias("scheduled_transaction_amount"),
]) ])
.drop([ .drop([
"subtransactions", "deleted","flag_name","account_name", "subtransactions", "deleted","flag_name","account_name",
+24
View File
@@ -0,0 +1,24 @@
'''Module to run the data pipeline'''
import logging
from pipeline.ingest import Ingest
from pipeline.raw_to_base import RawToBase
from pipeline.dimensions import DimAccounts, DimCategories, DimPayees, DimDate
from pipeline.facts import FactTransactions, FactScheduledTransactions
def pipeline_main(config):
'''Run the data pipeline'''
logging.info('Starting data pipeline')
Ingest(config)
RawToBase(config)
DimAccounts(config)
DimCategories(config)
DimPayees(config)
DimDate(config)
FactTransactions(config)
FactScheduledTransactions(config)
logging.info('Data pipeline completed successfully')
+1 -1
View File
@@ -130,7 +130,7 @@ Then move the files back in one at a time oldest to newest and run again for eac
df = df.with_columns( df = df.with_columns(
pl.when(pl.col(col).is_null()) pl.when(pl.col(col).is_null())
.then(pl.lit("null")) .then(pl.lit("null"))
.otherwise(pl.col(col).map_elements(lambda x: str(x) if x is not None else "null")) .otherwise(pl.col(col).map_elements(lambda x: str(x) if x is not None else "null", return_dtype=pl.Utf8))
.alias(col) .alias(col)
) )
return df return df
+5
View File
@@ -2,3 +2,8 @@ python-dotenv
polars polars
requests requests
pyyaml pyyaml
#visualisation requirements below
dash
pandas
pyarrow
dash-bootstrap-components