From 24cc006ef5d862e62ec658325a11eef4e070dacb Mon Sep 17 00:00:00 2001 From: Jake Pullen Date: Sat, 10 Aug 2024 17:11:57 +0100 Subject: [PATCH] Added more error handling and logic to ingest --- config/config.yaml | 4 +- config/exit_codes.py | 5 +++ pipeline/ingest.py | 97 ++++++++++++++++++++++++++++++++++---------- 3 files changed, 83 insertions(+), 23 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index e67fd95..83d3833 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -23,4 +23,6 @@ primary_keys: raw_data_path: data/raw processed_data_path: data/processed base_data_path: data/base -warehouse_data_path: data/warehouse \ No newline at end of file +warehouse_data_path: data/warehouse +REQUESTS_MAX_RETRIES: 3 +REQUESTS_RETRY_DELAY: 5 \ No newline at end of file diff --git a/config/exit_codes.py b/config/exit_codes.py index 4990cad..fea8f17 100644 --- a/config/exit_codes.py +++ b/config/exit_codes.py @@ -3,3 +3,8 @@ MISSING_ENV_VARS = 1 MISSING_CONFIG_FILE = 2 CORRUPTED_CONFIG_FILE = 3 UNAUTHORIZED_API_TOKEN = 4 +REQUESTS_ERROR = 5 +BAD_REQUEST = 6 +FORBIDDEN = 7 +NOT_FOUND = 8 +CONFLICT = 9 \ No newline at end of file diff --git a/pipeline/ingest.py b/pipeline/ingest.py index 2c77caf..416a55f 100644 --- a/pipeline/ingest.py +++ b/pipeline/ingest.py @@ -4,10 +4,13 @@ import json import logging import requests import sys +import yaml from typing import Dict, Any import config.exit_codes as ec class Ingest: + + def __init__(self, config: Dict[str, Any]): """ Initialize the Ingest class with the provided configuration. @@ -20,8 +23,18 @@ class Ingest: self.raw_data_path = config['raw_data_path'] self.headers = {'Authorization': f'Bearer {self.api_token}'} self.knowledge_cache = self.load_knowledge_cache() + self.load_additional_config() self.fetch_and_cache_entity_data() + def load_additional_config(self): + """ + Load additional configuration values from the config file. + """ + with open('config.config.yaml', 'r') as f: + config = yaml.safe_load(f) + self.MAX_RETRIES = config['REQUESTS_MAX_RETRIES'] + self.RETRY_DELAY = config['REQUESTS_RETRY_DELAY'] + def load_knowledge_cache(self) -> Dict[str, Any]: """ Load the knowledge cache from the file if it exists. @@ -40,8 +53,13 @@ class Ingest: if not os.path.exists(directory): os.makedirs(directory) entity_file = f'{directory}/{current_time}.json' - with open(entity_file, 'w') as f: - json.dump(data, f, indent=4) + logging.info(f"Saving {entity} data to {entity_file}") + try: + with open(entity_file, 'w') as f: + json.dump(data, f, indent=4) + except Exception as e: + logging.error(f"Error saving {entity} data: {e}") + def update_server_knowledge_cache(self, entity: str, server_knowledge: Any): """ @@ -51,8 +69,7 @@ class Ingest: with open(self.knowledge_file, 'r') as f: knowledge_cache = json.load(f) except FileNotFoundError: - # If the file does not exist, create an empty cache - # also create the file so we can save to it later + logging.info(f"Knowledge file not found. Creating a new one at {self.knowledge_file}. This is normal for the first run.") os.makedirs(os.path.dirname(self.knowledge_file), exist_ok=True) knowledge_cache = {} @@ -79,35 +96,68 @@ class Ingest: else: logging.warning("X-Rate-Limit header is missing.") + + def handle_response(self, response) -> bool: + if response.status_code == 400: + logging.error("Bad request. The request could not be understood by the API due to malformed syntax or validation errors.") + sys.exit(ec.BAD_REQUEST) + elif response.status_code == 401: + logging.error("Unauthorized. Please check your API token.") + sys.exit(ec.UNAUTHORIZED_API_TOKEN) + elif response.status_code == 403: + logging.error("Forbidden. Access is denied.") + sys.exit(ec.FORBIDDEN) + elif response.status_code == 404: + logging.error("Not found. The specified URI does not exist.") + sys.exit(ec.NOT_FOUND) + elif response.status_code == 409: + logging.error("Conflict. The resource cannot be saved due to a conflict.") + sys.exit(ec.CONFLICT) + elif response.status_code == 429: + logging.error("Too many requests. You have made too many requests in a short amount of time.") + return True + elif response.status_code == 500: + logging.error("Internal server error. The API experienced an unexpected error.") + return True + elif response.status_code == 503: + logging.error("Service unavailable. The API is temporarily disabled or a request timeout occurred.") + return True + else: + response.raise_for_status() + return False def fetch_and_cache_entity_data(self): """ Fetch and cache data for all entities. """ for entity in self.entities: - # if we already have files in the raw data folder, we need to skip that entity file_path = f'data/raw/{entity}' if os.path.exists(file_path) and os.listdir(file_path): - logging.warning(f"Skipping entity: {entity} as the raw data folder is not empty.") - continue + logging.warning(f"Raw data exists for {entity} processing any raw data we already have.") + break # break here instead of continue as we dont want to update our server knowledge cache and potentially miss data. + last_knowledge = self.knowledge_cache.get(entity, 0) - logging.debug(f'Last Knowledge of {entity.capitalize()}: {last_knowledge}') - url = f'{self.base_url}/{self.budget_id}/{entity}' - if last_knowledge: - logging.info(f'Fetching {entity} data since last knowledge: {last_knowledge}') - url = url + f'?last_knowledge_of_server={last_knowledge}' - - response = requests.get(url, headers=self.headers) - if response.status_code == 401: - logging.error("Unauthorized. Please check your API token.") - sys.exit(ec.UNAUTHORIZED_API_TOKEN) - - if self.check_rate_limit(response): - break + #logging.debug(f'Last Knowledge of {entity}: {last_knowledge}') + logging.info(f'Fetching {entity} data since last knowledge: {last_knowledge}') + url = f'{self.base_url}/{self.budget_id}/{entity}?last_knowledge_of_server={last_knowledge}' + + for attempt in range(self.MAX_RETRIES): + try: + response = requests.get(url, headers=self.headers) + should_retry = self.handle_response(response) + if not should_retry: + break # Exit the loop if the request is successful + except requests.exceptions.RequestException as e: + logging.error(f"Error fetching {entity} data (attempt {attempt + 1}/{self.MAX_RETRIES}): {e}") + if attempt < self.MAX_RETRIES - 1: + time.sleep(self.RETRY_DELAY) # Wait before retrying + else: + logging.error("Max retries reached. Exiting.") + sys.exit(ec.REQUESTS_ERROR) data = response.json() server_knowledge = data['data'].get('server_knowledge') - logging.debug(f'{entity.capitalize()} Server Knowledge: {server_knowledge}') + logging.debug(f'{entity} new server knowledge: {server_knowledge}') if server_knowledge is not None and server_knowledge != last_knowledge: self.update_server_knowledge_cache(entity, server_knowledge) @@ -115,4 +165,7 @@ class Ingest: entity_data.pop('server_knowledge', None) self.save_entity_data_to_raw(entity, entity_data) else: - logging.info(f"No new data for {entity}. Skipping cache update.") \ No newline at end of file + logging.info(f"No new data for {entity}. Skipping cache update.") + + if self.check_rate_limit(response): + break # break out here and continue processing the data we have.