diff --git a/dash_app.py b/dash_app.py new file mode 100644 index 0000000..9533285 --- /dev/null +++ b/dash_app.py @@ -0,0 +1,150 @@ +'''Module to create a Dash app that displays visualizations of YNAB data.''' + +import polars as pl +import plotly.express as px +from dash import Dash, html, dcc +import dash_bootstrap_components as dbc +import pandas as pd + +accounts = pl.read_parquet('data/warehouse/accounts.parquet') +categories = pl.read_parquet('data/warehouse/categories.parquet') +dates = pl.read_parquet('data/warehouse/dates.parquet') +payees = pl.read_parquet('data/warehouse/payees.parquet') +scheduled_transactions = pl.read_parquet('data/warehouse/scheduled_transactions.parquet') +transactions = pl.read_parquet('data/warehouse/transactions.parquet') + +# Join transactions with accounts, categories, and payees to create a master DataFrame +master_df = transactions.join(categories, left_on='category_id', right_on='id', suffix='_category')\ + .join(accounts, left_on='account_id', right_on='id', suffix='_account')\ + .join(payees, left_on='payee_id', right_on='id', suffix='_payee')\ + .join(dates, left_on='transaction_date', right_on='date_id', suffix='_date')\ + +# Create aggregations +spend_per_day = master_df.sql(''' + SELECT + date, + year, + month, + day, + ABS(SUM(transaction_amount)) as total + FROM self + WHERE category_name != 'Inflow: Ready to Assign' + GROUP BY date, year, month, day + ORDER BY date DESC + ''' +) + +spend_per_category = master_df.sql(''' + SELECT + category_name, + ABS(SUM(transaction_amount)) as total + FROM self + WHERE category_name != 'Inflow: Ready to Assign' + GROUP BY category_name + ORDER BY total DESC + ''' +) + +spend_per_payee = master_df.sql(''' + SELECT + payee_name, + ABS(SUM(transaction_amount)) as total + FROM self + WHERE payee_name != 'Starting Balance' + AND transaction_amount < 0 + GROUP BY payee_name + ORDER BY total DESC + ''' +) + +# Convert DataFrame to list of dictionaries +spend_per_day_data = spend_per_day.to_dicts() +spend_per_category_data = spend_per_category.to_dicts() +spend_per_payee_data = spend_per_payee.to_dicts() + +# Convert list of dictionaries to Pandas DataFrame +spend_per_day_df = pd.DataFrame(spend_per_day_data) +spend_per_category_df = pd.DataFrame(spend_per_category_data) +spend_per_payee_df = pd.DataFrame(spend_per_payee_data) + +spend_per_day_line = px.line(spend_per_day_df, x="date", y="total") +spend_per_day_line.update_layout( + plot_bgcolor='black', + paper_bgcolor='black', + font_color='white' +) + +spend_per_category_bar = px.bar(spend_per_category_df, x="category_name", y="total") +spend_per_category_bar.update_layout( + plot_bgcolor='black', + paper_bgcolor='black', + font_color='white' +) + +spend_per_payee_bar = px.bar(spend_per_payee_df, x="payee_name", y="total") +spend_per_payee_bar.update_layout( + plot_bgcolor='black', + paper_bgcolor='black', + font_color='white' +) + +# Initialize the app with a dark theme +app = Dash(external_stylesheets=[dbc.themes.DARKLY]) + +# App layout +app.layout = dbc.Container( + [ + dbc.Row( + dbc.Col( + html.Div("Data Pipeline For YNAB, Preview Visualisations", + className="text-center text-light"), + width=12 + ) + ), + dbc.Row( + [ + dbc.Col( + dbc.Card( + dbc.CardBody( + [ + html.H4("Spend Per Day", className="card-title"), + dcc.Graph(figure=spend_per_day_line) + ] + ), + className="mb-4" + ), + width=12 + ) + ] + ), + dbc.Row( + [ + dbc.Col( + dbc.Card( + dbc.CardBody( + [ + html.H4("Spend Per Category", className="card-title"), + dcc.Graph(figure=spend_per_category_bar) + ] + ), + className="mb-4" + ), + width=6 + ), + dbc.Col( + dbc.Card( + dbc.CardBody( + [ + html.H4("Spend Per Payee", className="card-title"), + dcc.Graph(figure=spend_per_payee_bar) + ] + ), + className="mb-4" + ), + width=6 + ) + ] + ) + ], + fluid=True +) diff --git a/main.py b/main.py index 33e686b..834983c 100644 --- a/main.py +++ b/main.py @@ -8,10 +8,8 @@ import logging.config import logging.handlers import config.exit_codes as ec -from pipeline.ingest import Ingest -from pipeline.raw_to_base import RawToBase -from pipeline.dimensions import DimAccounts, DimCategories, DimPayees, DimDate -from pipeline.facts import FactTransactions, FactScheduledTransactions +from dash_app import app +from pipeline.pipeline_main import pipeline_main def set_up_logging(): try: @@ -37,41 +35,30 @@ dotenv.load_dotenv() API_TOKEN = os.getenv('API_TOKEN') BUDGET_ID = os.getenv('BUDGET_ID') -def main(): - if not API_TOKEN or not BUDGET_ID: - logging.error('API_TOKEN or BUDGET_ID is not set in .env file') - sys.exit(ec.MISSING_ENV_VARS) - try: - with open('config/config.yaml', 'r') as file: - config = yaml.safe_load(file) - except FileNotFoundError: - logging.error('config.yaml file not found') - sys.exit(ec.MISSING_CONFIG_FILE) - except yaml.YAMLError as e: - logging.error(f'Error loading config.yaml: {e}') - sys.exit(ec.CORRUPTED_CONFIG_FILE) +if not API_TOKEN or not BUDGET_ID: + logging.error('API_TOKEN or BUDGET_ID is not set in .env file') + sys.exit(ec.MISSING_ENV_VARS) - config['API_TOKEN'] = API_TOKEN - config['BUDGET_ID'] = BUDGET_ID +try: + with open('config/config.yaml', 'r') as file: + config = yaml.safe_load(file) +except FileNotFoundError: + logging.error('config.yaml file not found') + sys.exit(ec.MISSING_CONFIG_FILE) +except yaml.YAMLError as e: + logging.error(f'Error loading config.yaml: {e}') + sys.exit(ec.CORRUPTED_CONFIG_FILE) - logging.info('Starting data pipeline') +config['API_TOKEN'] = API_TOKEN +config['BUDGET_ID'] = BUDGET_ID - Ingest(config) - RawToBase(config) - DimAccounts(config) - DimCategories(config) - DimPayees(config) - DimDate(config) - FactTransactions(config) - FactScheduledTransactions(config) - - logging.info('Data pipeline completed successfully') - sys.exit(ec.SUCCESS) + #sys.exit(ec.SUCCESS) if __name__ == '__main__': try: - main() + pipeline_main(config) + app.run() #debug=True) except SystemExit as e: exit_code = e.code if exit_code == ec.SUCCESS: diff --git a/pipeline/dimensions.py b/pipeline/dimensions.py index af0080b..2bc51e7 100644 --- a/pipeline/dimensions.py +++ b/pipeline/dimensions.py @@ -191,6 +191,18 @@ class DimDate(Dimensions): except Exception as e: logging.error(f"Failed to create a new column to indicate if the date is a weekday or weekend: {e}") return + + # Create a primary key by concatenating year, month, and day with no separators + try: + dates_df = dates_df.with_columns([ + (pl.col('year').cast(pl.Utf8) + + pl.col('month').cast(pl.Utf8).str.zfill(2) + + pl.col('day').cast(pl.Utf8).str.zfill(2) + ).alias('date_id') + ]) + except Exception as e: + logging.error(f"Failed to create the primary key column: {e}") + return # Write the DataFrame to a new parquet file logging.info("Writing the transformed dates DataFrame to parquet file") try: diff --git a/pipeline/facts.py b/pipeline/facts.py index 7611826..272ef11 100644 --- a/pipeline/facts.py +++ b/pipeline/facts.py @@ -27,12 +27,23 @@ class FactTransactions(Facts): # Transform the DataFrame logging.info("Transforming the transactions DataFrame") + try: + # Ensure the date column is in datetime format + transactions_df = transactions_df.with_columns([ + pl.col("date").str.strptime(pl.Date, format="%Y-%m-%d").alias("date") + ]) + except Exception as e: + logging.error(f"Failed to covert the date to date format: {e}") + return + try: transactions_df = ( transactions_df .with_columns([ pl.col("id").alias("transaction_id"), - pl.col("date").alias("transaction_date"), + (pl.col("date").dt.year().cast(pl.Utf8) + + pl.col("date").dt.month().cast(pl.Utf8).str.zfill(2) + + pl.col("date").dt.day().cast(pl.Utf8).str.zfill(2)).alias("transaction_date"), pl.col("amount").alias("transaction_amount"), pl.col("memo").alias("transaction_memo"), pl.col("cleared").alias("transaction_cleared"), @@ -45,7 +56,7 @@ class FactTransactions(Facts): ]) .with_columns([ pl.col("memo").fill_null("unknown"), - (pl.col("amount") / 100).alias("transaction_amount"), + (pl.col("amount") / 1000).alias("transaction_amount"), ]) .drop([ "transfer_transaction_id", "matched_transaction_id", "import_id", @@ -98,7 +109,7 @@ class FactScheduledTransactions(Facts): ]) .with_columns([ pl.col("memo").fill_null("unknown"), - (pl.col("amount") / 100).alias("scheduled_transaction_amount"), + (pl.col("amount") / 1000).alias("scheduled_transaction_amount"), ]) .drop([ "subtransactions", "deleted","flag_name","account_name", diff --git a/pipeline/pipeline_main.py b/pipeline/pipeline_main.py new file mode 100644 index 0000000..05d3a9b --- /dev/null +++ b/pipeline/pipeline_main.py @@ -0,0 +1,24 @@ +'''Module to run the data pipeline''' + +import logging + +from pipeline.ingest import Ingest +from pipeline.raw_to_base import RawToBase +from pipeline.dimensions import DimAccounts, DimCategories, DimPayees, DimDate +from pipeline.facts import FactTransactions, FactScheduledTransactions + + +def pipeline_main(config): + '''Run the data pipeline''' + logging.info('Starting data pipeline') + + Ingest(config) + RawToBase(config) + DimAccounts(config) + DimCategories(config) + DimPayees(config) + DimDate(config) + FactTransactions(config) + FactScheduledTransactions(config) + + logging.info('Data pipeline completed successfully') diff --git a/pipeline/raw_to_base.py b/pipeline/raw_to_base.py index 932bb78..88bac37 100644 --- a/pipeline/raw_to_base.py +++ b/pipeline/raw_to_base.py @@ -130,7 +130,7 @@ Then move the files back in one at a time oldest to newest and run again for eac df = df.with_columns( pl.when(pl.col(col).is_null()) .then(pl.lit("null")) - .otherwise(pl.col(col).map_elements(lambda x: str(x) if x is not None else "null")) + .otherwise(pl.col(col).map_elements(lambda x: str(x) if x is not None else "null", return_dtype=pl.Utf8)) .alias(col) ) return df diff --git a/requirements.txt b/requirements.txt index 5950c70..1642cd8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,9 @@ python-dotenv polars requests -pyyaml \ No newline at end of file +pyyaml +#visualisation requirements below +dash +pandas +pyarrow +dash-bootstrap-components \ No newline at end of file