base working fine i think

This commit is contained in:
Jake Pullen
2024-08-09 08:53:02 +01:00
parent 41046c0da1
commit cea3b94b8b
7 changed files with 92 additions and 39 deletions
+22
View File
@@ -0,0 +1,22 @@
entities:
- accounts
- categories
- months
- payees
- transactions
- scheduled_transactions
base_url: https://api.ynab.com/v1/budgets
knowledge_file: server_knowledge_cache.json
primary_keys:
accounts:
unique_id: id
categories:
unique_id: id
months:
unique_id: month
payees:
unique_id: id
transactions:
unique_id: id
scheduled_transactions:
unique_id: id
+24
View File
@@ -0,0 +1,24 @@
# ERD for the Finance DataWarehouse
```mermaid
erDiagram
ACCOUNTS {
int account_id
string account_name
string account_type
boolean on_budget
boolean closed
text note
decimal balance
decimal cleared_balance
decimal uncleared_balance
boolean deleted
}
ACCOUNT_TYPES {
int account_type_id
string account_type_name
}
ACCOUNTS ||--o{ ACCOUNT_TYPES : "has type"
```
+2
View File
@@ -0,0 +1,2 @@
# Flow of data from source to gold
+5
View File
@@ -75,6 +75,11 @@ class Ingest:
Fetch and cache data for all entities.
"""
for entity in self.entities:
# if we already have files in the raw data folder, we need to skip that entity
file_path = f'data/raw/{entity}'
if os.path.exists(file_path) and os.listdir(file_path):
logging.warning(f"Skipping entity: {entity} as the raw data folder is not empty.")
continue
last_knowledge = self.knowledge_cache.get(entity, 0)
logging.debug(f'Last Knowledge of {entity.capitalize()}: {last_knowledge}')
url = f'{self.base_url}/{self.budget_id}/{entity}'
+8 -10
View File
@@ -1,6 +1,8 @@
import os
import dotenv
import logging
import yaml
from ingest import Ingest
from raw_to_base import RawToBase
@@ -10,15 +12,11 @@ API_TOKEN = os.getenv('API_TOKEN')
BUDGET_ID = os.getenv('BUDGET_ID')
logging.basicConfig(level=logging.DEBUG)
entities = ['accounts', 'categories', 'months', 'payees', 'transactions', 'scheduled_transactions']
ingest_info = {}
with open('config.yaml', 'r') as file:
config = yaml.safe_load(file)
ingest_info['entities'] = entities
ingest_info['base_url'] = 'https://api.ynab.com/v1/budgets'
ingest_info['knowledge_file'] = 'server_knowledge_cache.json'
ingest_info['API_TOKEN'] = API_TOKEN
ingest_info['BUDGET_ID'] = BUDGET_ID
config['API_TOKEN'] = API_TOKEN
config['BUDGET_ID'] = BUDGET_ID
Ingest(ingest_info)
RawToBase(entities)
Ingest(config)
RawToBase(config)
+29 -28
View File
@@ -6,19 +6,12 @@ from typing import List, Dict, Any
import polars as pl
class RawToBase:
def __init__(self, entities: List[str]):
self.entities = entities
self.config = {
'accounts': {'unique_id': 'id'},
'categories': {'unique_id': 'id'},
'months': {'unique_id': 'month'},
'payees': {'unique_id': 'id'},
'transactions': {'unique_id': 'id'},
'scheduled_transactions': {'unique_id': 'id'}
}
def __init__(self, config: Dict[str, Any]):
self.entities = config['entities']
self.primary_keys = config['primary_keys']
self.raw_data_path = 'data/raw'
self.base_data_path = 'data/base'
self.processed_data_path = 'data/processed'
self.base_data_path = 'data/base'
self.data = {}
self.base_data = {}
logging.basicConfig(level=logging.DEBUG)
@@ -55,26 +48,34 @@ class RawToBase:
with open(file_path, 'r') as f:
data = json.load(f)
# Check if the data is empty
if entity == "categories":
# Check if any category group has categories
has_categories = any(group.get("categories") for group in data.get("category_groups", []))
if not has_categories:
logging.warning(f"Received empty data for entity: {entity} in file: {file_path}, deleting file.")
os.remove(file_path)
return False
else:
if not data.get(entity, []):
logging.warning(f"Received empty data for entity: {entity} in file: {file_path}, deleting file.")
# delete the file as it is empty
os.remove(file_path)
return False
modified_data = []
if entity == 'categories':
for group in data.get('category_groups', []):
for category in group.get('categories', []):
category['ingestion_date'] = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()
modified_data.append(category)
else:
for record in data.get(f'{entity}', []):
if isinstance(record, dict):
record['ingestion_date'] = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()
modified_data.append(record)
else:
modified_data.append({'record': record, 'ingestion_date': datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()})
self.data[entity].append(modified_data)
logging.debug(f"Successfully loaded data from file: {file_path}")
modified_data = []
if entity == 'categories':
for group in data.get('category_groups', []):
for category in group.get('categories', []):
category['ingestion_date'] = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()
modified_data.append(category)
else:
for record in data.get(f'{entity}', []):
if isinstance(record, dict):
record['ingestion_date'] = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()
modified_data.append(record)
else:
modified_data.append({'record': record, 'ingestion_date': datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()})
self.data[entity].append(modified_data)
logging.debug(f"Successfully loaded data from file: {file_path}")
except Exception as e:
logging.error(f"Failed to load data from file: {file_path}, error: {e}")
exit(1)
@@ -105,7 +106,7 @@ class RawToBase:
#print(new_data_df)
# Ensure the unique id column is preserved
unique_id = self.config[entity]['unique_id']
unique_id = self.primary_keys[entity]['unique_id']
if unique_id not in new_data_df.columns:
logging.error(f"Unique ID column '{unique_id}' not found in the combined data for entity: {entity}")
exit(1)
@@ -115,7 +116,7 @@ class RawToBase:
def _resolve_duplicates(self, entity):
logging.debug(f"Resolving duplicates for entity: {entity}")
unique_id = self.config[entity]['unique_id']
unique_id = self.primary_keys[entity]['unique_id']
self.base_data[entity] = self.base_data[entity].sort(by='ingestion_date').unique(subset=unique_id, keep='first')
logging.debug(f"Successfully resolved duplicates for entity: {entity}")
+2 -1
View File
@@ -1,3 +1,4 @@
python-dotenv
polars
requests
requests
pyyaml