From d25136fbb0461565fa30f3595531ae5ad7d73d7a Mon Sep 17 00:00:00 2001 From: Jake Pullen Date: Sat, 10 Aug 2024 15:54:31 +0100 Subject: [PATCH] Added some more error handling and request limit handling --- config/exit_codes.py | 2 ++ main.py | 24 +++++++++++++++++------- pipeline/ingest.py | 14 ++++++++------ 3 files changed, 27 insertions(+), 13 deletions(-) diff --git a/config/exit_codes.py b/config/exit_codes.py index ea0bdbe..4990cad 100644 --- a/config/exit_codes.py +++ b/config/exit_codes.py @@ -1,3 +1,5 @@ SUCCESS = 0 MISSING_ENV_VARS = 1 MISSING_CONFIG_FILE = 2 +CORRUPTED_CONFIG_FILE = 3 +UNAUTHORIZED_API_TOKEN = 4 diff --git a/main.py b/main.py index d76d046..17a089d 100644 --- a/main.py +++ b/main.py @@ -14,12 +14,14 @@ from pipeline.dimensions import DimAccounts, DimCategories, DimPayees, DimDate from pipeline.facts import FactTransactions, FactScheduledTransactions def set_up_logging(): - with open('config/logging_config.yaml', 'r') as f: - try: + try: + with open('config/logging_config.yaml', 'r') as f: log_config = yaml.safe_load(f) - except yaml.YAMLError as e: - print(e) - logging.config.dictConfig(log_config) + logging.config.dictConfig(log_config) + except yaml.YAMLError as e: + print(f"Error parsing logging configuration file: {e}") + log_config = {} # Initialize log_config to an empty dictionary + logging.basicConfig(level=logging.INFO) # Fallback to a basic configuration queue_handler = logging.getHandlerByName('queue_handler') if queue_handler is not None: queue_handler.listener.start() @@ -28,6 +30,7 @@ def set_up_logging(): logger = logging.getLogger("data_pipeline_for_ynab") os.makedirs('logs', exist_ok=True) set_up_logging() + # Load environment variables dotenv.load_dotenv() @@ -39,8 +42,15 @@ def main(): logging.error('API_TOKEN or BUDGET_ID is not set in .env file') sys.exit(ec.MISSING_ENV_VARS) - with open('config/config.yaml', 'r') as file: - config = yaml.safe_load(file) + try: + with open('config/config.yaml', 'r') as file: + config = yaml.safe_load(file) + except FileNotFoundError: + logging.error('config.yaml file not found') + sys.exit(ec.MISSING_CONFIG_FILE) + except yaml.YAMLError as e: + logging.error(f'Error loading config.yaml: {e}') + sys.exit(ec.CORRUPTED_CONFIG_FILE) config['API_TOKEN'] = API_TOKEN config['BUDGET_ID'] = BUDGET_ID diff --git a/pipeline/ingest.py b/pipeline/ingest.py index e49e4bf..2c77caf 100644 --- a/pipeline/ingest.py +++ b/pipeline/ingest.py @@ -3,7 +3,9 @@ import time import json import logging import requests +import sys from typing import Dict, Any +import config.exit_codes as ec class Ingest: def __init__(self, config: Dict[str, Any]): @@ -71,6 +73,10 @@ class Ingest: if remaining_requests < 20: logging.warning("Approaching rate limit. Consider pausing further requests.") # Implement pause or delay logic here if necessary + if remaining_requests == 1: + logging.error("Rate limit exceeded. ending requests here and moving on with what we have.") + return True #returning True here to break out of any more ingestions + else: logging.warning("X-Rate-Limit header is missing.") @@ -94,13 +100,9 @@ class Ingest: response = requests.get(url, headers=self.headers) if response.status_code == 401: logging.error("Unauthorized. Please check your API token.") - break + sys.exit(ec.UNAUTHORIZED_API_TOKEN) - self.check_rate_limit(response) - - if response.status_code == 429: - logging.error("Rate limit exceeded. Pausing until the limit is reset.") - # Implement pause until the limit reset logic here + if self.check_rate_limit(response): break data = response.json()