From fe99ee4469bd4a8b53fa7e36954d11f21265be3d Mon Sep 17 00:00:00 2001 From: Jake Pullen Date: Sun, 28 Jul 2024 10:02:18 +0100 Subject: [PATCH] initial commit, injest POC done --- .gitignore | 5 ++++ injest.py | 82 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ main.py | 15 ++++++++++ 3 files changed, 102 insertions(+) create mode 100644 .gitignore create mode 100644 injest.py create mode 100644 main.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..12d1859 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +*.env +*.csv +*.log +server_knowledge_cache.json +data/* \ No newline at end of file diff --git a/injest.py b/injest.py new file mode 100644 index 0000000..59e2c1e --- /dev/null +++ b/injest.py @@ -0,0 +1,82 @@ +class injest: + def __init__(self): + + + def update_entity_data_cache(entity, data): + current_time = time.strftime('%Y%m%d%H%M%S') + directory = f'data/{entity}' # Directory name is the entity's name + if not os.path.exists(directory): + os.makedirs(directory) # Create the directory if it does not exist + entity_file = f'{directory}/{current_time}.json' # Separate file for each entity's data + with open(entity_file, 'w') as f: + json.dump(data, f, indent=4) + + def update_server_knowledge_cache(entity, server_knowledge): + knowledge_file = 'server_knowledge_cache.json' + try: + with open(knowledge_file, 'r') as f: + knowledge_cache = json.load(f) + except FileNotFoundError: + knowledge_cache = {} + + knowledge_cache[entity] = server_knowledge + + with open(knowledge_file, 'w') as f: + json.dump(knowledge_cache, f, indent=4) + + def check_rate_limit(response): + rate_limit_header = response.headers.get('X-Rate-Limit') + if rate_limit_header: + requests_made, limit = map(int, rate_limit_header.split('/')) + remaining_requests = limit - requests_made + logging.info(f"Rate Limit: {remaining_requests}/{limit} requests remaining.") + if remaining_requests < 20: # Arbitrary low number to start handling the limit + logging.warning("Approaching rate limit. Consider pausing further requests.") + # Implement pause or delay logic here if necessary + else: + logging.warning("X-Rate-Limit header is missing.") + + def fetch_and_cache_entity_data(budget_id): + entities = ['accounts', 'categories', 'months', 'payees', 'transactions', 'scheduled_transactions'] + base_url = 'https://api.ynab.com/v1/budgets' + knowledge_file = 'server_knowledge_cache.json' + + # Load existing server knowledge cache + try: + with open(knowledge_file, 'r') as f: + knowledge_cache = json.load(f) + except FileNotFoundError: + knowledge_cache = {} + + for entity in entities: + last_knowledge = knowledge_cache.get(entity, 0) + logging.debug(f'Last Knowledge of {entity.capitalize()}: {last_knowledge}') + url = f'{base_url}/{budget_id}/{entity}' + if last_knowledge: + logging.info(f'Fetching {entity} data since last knowledge: {last_knowledge}') + url = url + f'?last_knowledge_of_server={last_knowledge}' + + response = requests.get(url, headers=headers) + check_rate_limit(response) # Check and handle rate limit + + if response.status_code == 429: # HTTP 429 Too Many Requests + logging.error("Rate limit exceeded. Pausing until the limit is reset.") + # Implement pause until the limit reset logic here + break + + data = response.json() + + server_knowledge = data['data'].get('server_knowledge') + logging.debug(f'{entity.capitalize()} Server Knowledge: {server_knowledge}') + + # Check if there is new server knowledge + if server_knowledge is not None and server_knowledge != last_knowledge: + # Update server knowledge cache + update_server_knowledge_cache(entity, server_knowledge) + + # Update entity data cache without server knowledge + entity_data = data['data'] + entity_data.pop('server_knowledge', None) # Remove server knowledge if exists + update_entity_data_cache(entity, entity_data) + else: + logging.info(f"No new data for {entity}. Skipping cache update.") diff --git a/main.py b/main.py new file mode 100644 index 0000000..9014473 --- /dev/null +++ b/main.py @@ -0,0 +1,15 @@ +import requests +import os +import json +import dotenv +import logging +import time +from injest import injest + +dotenv.load_dotenv() + +API_TOKEN = os.getenv('API_TOKEN') +BUDGET_ID = os.getenv('BUDGET_ID') +headers = {'Authorization': f'Bearer {API_TOKEN}'} +logging.basicConfig(level=logging.DEBUG) +