Added more error handling and logic to ingest
This commit is contained in:
@@ -24,3 +24,5 @@ raw_data_path: data/raw
|
|||||||
processed_data_path: data/processed
|
processed_data_path: data/processed
|
||||||
base_data_path: data/base
|
base_data_path: data/base
|
||||||
warehouse_data_path: data/warehouse
|
warehouse_data_path: data/warehouse
|
||||||
|
REQUESTS_MAX_RETRIES: 3
|
||||||
|
REQUESTS_RETRY_DELAY: 5
|
||||||
@@ -3,3 +3,8 @@ MISSING_ENV_VARS = 1
|
|||||||
MISSING_CONFIG_FILE = 2
|
MISSING_CONFIG_FILE = 2
|
||||||
CORRUPTED_CONFIG_FILE = 3
|
CORRUPTED_CONFIG_FILE = 3
|
||||||
UNAUTHORIZED_API_TOKEN = 4
|
UNAUTHORIZED_API_TOKEN = 4
|
||||||
|
REQUESTS_ERROR = 5
|
||||||
|
BAD_REQUEST = 6
|
||||||
|
FORBIDDEN = 7
|
||||||
|
NOT_FOUND = 8
|
||||||
|
CONFLICT = 9
|
||||||
+69
-16
@@ -4,10 +4,13 @@ import json
|
|||||||
import logging
|
import logging
|
||||||
import requests
|
import requests
|
||||||
import sys
|
import sys
|
||||||
|
import yaml
|
||||||
from typing import Dict, Any
|
from typing import Dict, Any
|
||||||
import config.exit_codes as ec
|
import config.exit_codes as ec
|
||||||
|
|
||||||
class Ingest:
|
class Ingest:
|
||||||
|
|
||||||
|
|
||||||
def __init__(self, config: Dict[str, Any]):
|
def __init__(self, config: Dict[str, Any]):
|
||||||
"""
|
"""
|
||||||
Initialize the Ingest class with the provided configuration.
|
Initialize the Ingest class with the provided configuration.
|
||||||
@@ -20,8 +23,18 @@ class Ingest:
|
|||||||
self.raw_data_path = config['raw_data_path']
|
self.raw_data_path = config['raw_data_path']
|
||||||
self.headers = {'Authorization': f'Bearer {self.api_token}'}
|
self.headers = {'Authorization': f'Bearer {self.api_token}'}
|
||||||
self.knowledge_cache = self.load_knowledge_cache()
|
self.knowledge_cache = self.load_knowledge_cache()
|
||||||
|
self.load_additional_config()
|
||||||
self.fetch_and_cache_entity_data()
|
self.fetch_and_cache_entity_data()
|
||||||
|
|
||||||
|
def load_additional_config(self):
|
||||||
|
"""
|
||||||
|
Load additional configuration values from the config file.
|
||||||
|
"""
|
||||||
|
with open('config.config.yaml', 'r') as f:
|
||||||
|
config = yaml.safe_load(f)
|
||||||
|
self.MAX_RETRIES = config['REQUESTS_MAX_RETRIES']
|
||||||
|
self.RETRY_DELAY = config['REQUESTS_RETRY_DELAY']
|
||||||
|
|
||||||
def load_knowledge_cache(self) -> Dict[str, Any]:
|
def load_knowledge_cache(self) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Load the knowledge cache from the file if it exists.
|
Load the knowledge cache from the file if it exists.
|
||||||
@@ -40,8 +53,13 @@ class Ingest:
|
|||||||
if not os.path.exists(directory):
|
if not os.path.exists(directory):
|
||||||
os.makedirs(directory)
|
os.makedirs(directory)
|
||||||
entity_file = f'{directory}/{current_time}.json'
|
entity_file = f'{directory}/{current_time}.json'
|
||||||
|
logging.info(f"Saving {entity} data to {entity_file}")
|
||||||
|
try:
|
||||||
with open(entity_file, 'w') as f:
|
with open(entity_file, 'w') as f:
|
||||||
json.dump(data, f, indent=4)
|
json.dump(data, f, indent=4)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error saving {entity} data: {e}")
|
||||||
|
|
||||||
|
|
||||||
def update_server_knowledge_cache(self, entity: str, server_knowledge: Any):
|
def update_server_knowledge_cache(self, entity: str, server_knowledge: Any):
|
||||||
"""
|
"""
|
||||||
@@ -51,8 +69,7 @@ class Ingest:
|
|||||||
with open(self.knowledge_file, 'r') as f:
|
with open(self.knowledge_file, 'r') as f:
|
||||||
knowledge_cache = json.load(f)
|
knowledge_cache = json.load(f)
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
# If the file does not exist, create an empty cache
|
logging.info(f"Knowledge file not found. Creating a new one at {self.knowledge_file}. This is normal for the first run.")
|
||||||
# also create the file so we can save to it later
|
|
||||||
os.makedirs(os.path.dirname(self.knowledge_file), exist_ok=True)
|
os.makedirs(os.path.dirname(self.knowledge_file), exist_ok=True)
|
||||||
knowledge_cache = {}
|
knowledge_cache = {}
|
||||||
|
|
||||||
@@ -80,34 +97,67 @@ class Ingest:
|
|||||||
else:
|
else:
|
||||||
logging.warning("X-Rate-Limit header is missing.")
|
logging.warning("X-Rate-Limit header is missing.")
|
||||||
|
|
||||||
|
def handle_response(self, response) -> bool:
|
||||||
|
if response.status_code == 400:
|
||||||
|
logging.error("Bad request. The request could not be understood by the API due to malformed syntax or validation errors.")
|
||||||
|
sys.exit(ec.BAD_REQUEST)
|
||||||
|
elif response.status_code == 401:
|
||||||
|
logging.error("Unauthorized. Please check your API token.")
|
||||||
|
sys.exit(ec.UNAUTHORIZED_API_TOKEN)
|
||||||
|
elif response.status_code == 403:
|
||||||
|
logging.error("Forbidden. Access is denied.")
|
||||||
|
sys.exit(ec.FORBIDDEN)
|
||||||
|
elif response.status_code == 404:
|
||||||
|
logging.error("Not found. The specified URI does not exist.")
|
||||||
|
sys.exit(ec.NOT_FOUND)
|
||||||
|
elif response.status_code == 409:
|
||||||
|
logging.error("Conflict. The resource cannot be saved due to a conflict.")
|
||||||
|
sys.exit(ec.CONFLICT)
|
||||||
|
elif response.status_code == 429:
|
||||||
|
logging.error("Too many requests. You have made too many requests in a short amount of time.")
|
||||||
|
return True
|
||||||
|
elif response.status_code == 500:
|
||||||
|
logging.error("Internal server error. The API experienced an unexpected error.")
|
||||||
|
return True
|
||||||
|
elif response.status_code == 503:
|
||||||
|
logging.error("Service unavailable. The API is temporarily disabled or a request timeout occurred.")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
response.raise_for_status()
|
||||||
|
return False
|
||||||
|
|
||||||
def fetch_and_cache_entity_data(self):
|
def fetch_and_cache_entity_data(self):
|
||||||
"""
|
"""
|
||||||
Fetch and cache data for all entities.
|
Fetch and cache data for all entities.
|
||||||
"""
|
"""
|
||||||
for entity in self.entities:
|
for entity in self.entities:
|
||||||
# if we already have files in the raw data folder, we need to skip that entity
|
|
||||||
file_path = f'data/raw/{entity}'
|
file_path = f'data/raw/{entity}'
|
||||||
if os.path.exists(file_path) and os.listdir(file_path):
|
if os.path.exists(file_path) and os.listdir(file_path):
|
||||||
logging.warning(f"Skipping entity: {entity} as the raw data folder is not empty.")
|
logging.warning(f"Raw data exists for {entity} processing any raw data we already have.")
|
||||||
continue
|
break # break here instead of continue as we dont want to update our server knowledge cache and potentially miss data.
|
||||||
|
|
||||||
last_knowledge = self.knowledge_cache.get(entity, 0)
|
last_knowledge = self.knowledge_cache.get(entity, 0)
|
||||||
logging.debug(f'Last Knowledge of {entity.capitalize()}: {last_knowledge}')
|
#logging.debug(f'Last Knowledge of {entity}: {last_knowledge}')
|
||||||
url = f'{self.base_url}/{self.budget_id}/{entity}'
|
|
||||||
if last_knowledge:
|
|
||||||
logging.info(f'Fetching {entity} data since last knowledge: {last_knowledge}')
|
logging.info(f'Fetching {entity} data since last knowledge: {last_knowledge}')
|
||||||
url = url + f'?last_knowledge_of_server={last_knowledge}'
|
url = f'{self.base_url}/{self.budget_id}/{entity}?last_knowledge_of_server={last_knowledge}'
|
||||||
|
|
||||||
|
for attempt in range(self.MAX_RETRIES):
|
||||||
|
try:
|
||||||
response = requests.get(url, headers=self.headers)
|
response = requests.get(url, headers=self.headers)
|
||||||
if response.status_code == 401:
|
should_retry = self.handle_response(response)
|
||||||
logging.error("Unauthorized. Please check your API token.")
|
if not should_retry:
|
||||||
sys.exit(ec.UNAUTHORIZED_API_TOKEN)
|
break # Exit the loop if the request is successful
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
if self.check_rate_limit(response):
|
logging.error(f"Error fetching {entity} data (attempt {attempt + 1}/{self.MAX_RETRIES}): {e}")
|
||||||
break
|
if attempt < self.MAX_RETRIES - 1:
|
||||||
|
time.sleep(self.RETRY_DELAY) # Wait before retrying
|
||||||
|
else:
|
||||||
|
logging.error("Max retries reached. Exiting.")
|
||||||
|
sys.exit(ec.REQUESTS_ERROR)
|
||||||
|
|
||||||
data = response.json()
|
data = response.json()
|
||||||
server_knowledge = data['data'].get('server_knowledge')
|
server_knowledge = data['data'].get('server_knowledge')
|
||||||
logging.debug(f'{entity.capitalize()} Server Knowledge: {server_knowledge}')
|
logging.debug(f'{entity} new server knowledge: {server_knowledge}')
|
||||||
|
|
||||||
if server_knowledge is not None and server_knowledge != last_knowledge:
|
if server_knowledge is not None and server_knowledge != last_knowledge:
|
||||||
self.update_server_knowledge_cache(entity, server_knowledge)
|
self.update_server_knowledge_cache(entity, server_knowledge)
|
||||||
@@ -116,3 +166,6 @@ class Ingest:
|
|||||||
self.save_entity_data_to_raw(entity, entity_data)
|
self.save_entity_data_to_raw(entity, entity_data)
|
||||||
else:
|
else:
|
||||||
logging.info(f"No new data for {entity}. Skipping cache update.")
|
logging.info(f"No new data for {entity}. Skipping cache update.")
|
||||||
|
|
||||||
|
if self.check_rate_limit(response):
|
||||||
|
break # break out here and continue processing the data we have.
|
||||||
|
|||||||
Reference in New Issue
Block a user