From cea3b94b8b2dc6f040c38f2dfbddecba1dfcdb7c Mon Sep 17 00:00:00 2001 From: Jake Pullen Date: Fri, 9 Aug 2024 08:53:02 +0100 Subject: [PATCH] base working fine i think --- config.yaml | 22 +++++++++++++++++++ docs/ERD.md | 24 ++++++++++++++++++++ docs/dataflow.md | 2 ++ ingest.py | 5 +++++ main.py | 18 +++++++-------- raw_to_base.py | 57 ++++++++++++++++++++++++------------------------ requirements.txt | 3 ++- 7 files changed, 92 insertions(+), 39 deletions(-) create mode 100644 config.yaml create mode 100644 docs/ERD.md create mode 100644 docs/dataflow.md diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..1e77a95 --- /dev/null +++ b/config.yaml @@ -0,0 +1,22 @@ +entities: + - accounts + - categories + - months + - payees + - transactions + - scheduled_transactions +base_url: https://api.ynab.com/v1/budgets +knowledge_file: server_knowledge_cache.json +primary_keys: + accounts: + unique_id: id + categories: + unique_id: id + months: + unique_id: month + payees: + unique_id: id + transactions: + unique_id: id + scheduled_transactions: + unique_id: id \ No newline at end of file diff --git a/docs/ERD.md b/docs/ERD.md new file mode 100644 index 0000000..aa3d17f --- /dev/null +++ b/docs/ERD.md @@ -0,0 +1,24 @@ +# ERD for the Finance DataWarehouse + +```mermaid +erDiagram + ACCOUNTS { + int account_id + string account_name + string account_type + boolean on_budget + boolean closed + text note + decimal balance + decimal cleared_balance + decimal uncleared_balance + boolean deleted + } + + ACCOUNT_TYPES { + int account_type_id + string account_type_name + } + + ACCOUNTS ||--o{ ACCOUNT_TYPES : "has type" +``` \ No newline at end of file diff --git a/docs/dataflow.md b/docs/dataflow.md new file mode 100644 index 0000000..6d3c736 --- /dev/null +++ b/docs/dataflow.md @@ -0,0 +1,2 @@ +# Flow of data from source to gold + diff --git a/ingest.py b/ingest.py index e0b4f70..ea796a4 100644 --- a/ingest.py +++ b/ingest.py @@ -75,6 +75,11 @@ class Ingest: Fetch and cache data for all entities. """ for entity in self.entities: + # if we already have files in the raw data folder, we need to skip that entity + file_path = f'data/raw/{entity}' + if os.path.exists(file_path) and os.listdir(file_path): + logging.warning(f"Skipping entity: {entity} as the raw data folder is not empty.") + continue last_knowledge = self.knowledge_cache.get(entity, 0) logging.debug(f'Last Knowledge of {entity.capitalize()}: {last_knowledge}') url = f'{self.base_url}/{self.budget_id}/{entity}' diff --git a/main.py b/main.py index 6749086..f6156aa 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,8 @@ import os import dotenv import logging +import yaml + from ingest import Ingest from raw_to_base import RawToBase @@ -10,15 +12,11 @@ API_TOKEN = os.getenv('API_TOKEN') BUDGET_ID = os.getenv('BUDGET_ID') logging.basicConfig(level=logging.DEBUG) -entities = ['accounts', 'categories', 'months', 'payees', 'transactions', 'scheduled_transactions'] -ingest_info = {} +with open('config.yaml', 'r') as file: + config = yaml.safe_load(file) -ingest_info['entities'] = entities -ingest_info['base_url'] = 'https://api.ynab.com/v1/budgets' -ingest_info['knowledge_file'] = 'server_knowledge_cache.json' -ingest_info['API_TOKEN'] = API_TOKEN -ingest_info['BUDGET_ID'] = BUDGET_ID +config['API_TOKEN'] = API_TOKEN +config['BUDGET_ID'] = BUDGET_ID - -Ingest(ingest_info) -RawToBase(entities) \ No newline at end of file +Ingest(config) +RawToBase(config) \ No newline at end of file diff --git a/raw_to_base.py b/raw_to_base.py index 80befcd..5b3c6ec 100644 --- a/raw_to_base.py +++ b/raw_to_base.py @@ -6,19 +6,12 @@ from typing import List, Dict, Any import polars as pl class RawToBase: - def __init__(self, entities: List[str]): - self.entities = entities - self.config = { - 'accounts': {'unique_id': 'id'}, - 'categories': {'unique_id': 'id'}, - 'months': {'unique_id': 'month'}, - 'payees': {'unique_id': 'id'}, - 'transactions': {'unique_id': 'id'}, - 'scheduled_transactions': {'unique_id': 'id'} - } + def __init__(self, config: Dict[str, Any]): + self.entities = config['entities'] + self.primary_keys = config['primary_keys'] self.raw_data_path = 'data/raw' - self.base_data_path = 'data/base' self.processed_data_path = 'data/processed' + self.base_data_path = 'data/base' self.data = {} self.base_data = {} logging.basicConfig(level=logging.DEBUG) @@ -55,26 +48,34 @@ class RawToBase: with open(file_path, 'r') as f: data = json.load(f) # Check if the data is empty + if entity == "categories": + # Check if any category group has categories + has_categories = any(group.get("categories") for group in data.get("category_groups", [])) + if not has_categories: + logging.warning(f"Received empty data for entity: {entity} in file: {file_path}, deleting file.") + os.remove(file_path) + return False + else: if not data.get(entity, []): logging.warning(f"Received empty data for entity: {entity} in file: {file_path}, deleting file.") # delete the file as it is empty os.remove(file_path) return False - modified_data = [] - if entity == 'categories': - for group in data.get('category_groups', []): - for category in group.get('categories', []): - category['ingestion_date'] = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date() - modified_data.append(category) - else: - for record in data.get(f'{entity}', []): - if isinstance(record, dict): - record['ingestion_date'] = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date() - modified_data.append(record) - else: - modified_data.append({'record': record, 'ingestion_date': datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()}) - self.data[entity].append(modified_data) - logging.debug(f"Successfully loaded data from file: {file_path}") + modified_data = [] + if entity == 'categories': + for group in data.get('category_groups', []): + for category in group.get('categories', []): + category['ingestion_date'] = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date() + modified_data.append(category) + else: + for record in data.get(f'{entity}', []): + if isinstance(record, dict): + record['ingestion_date'] = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date() + modified_data.append(record) + else: + modified_data.append({'record': record, 'ingestion_date': datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()}) + self.data[entity].append(modified_data) + logging.debug(f"Successfully loaded data from file: {file_path}") except Exception as e: logging.error(f"Failed to load data from file: {file_path}, error: {e}") exit(1) @@ -105,7 +106,7 @@ class RawToBase: #print(new_data_df) # Ensure the unique id column is preserved - unique_id = self.config[entity]['unique_id'] + unique_id = self.primary_keys[entity]['unique_id'] if unique_id not in new_data_df.columns: logging.error(f"Unique ID column '{unique_id}' not found in the combined data for entity: {entity}") exit(1) @@ -115,7 +116,7 @@ class RawToBase: def _resolve_duplicates(self, entity): logging.debug(f"Resolving duplicates for entity: {entity}") - unique_id = self.config[entity]['unique_id'] + unique_id = self.primary_keys[entity]['unique_id'] self.base_data[entity] = self.base_data[entity].sort(by='ingestion_date').unique(subset=unique_id, keep='first') logging.debug(f"Successfully resolved duplicates for entity: {entity}") diff --git a/requirements.txt b/requirements.txt index a6cff53..5950c70 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ python-dotenv polars -requests \ No newline at end of file +requests +pyyaml \ No newline at end of file