From 812e1070139c3cce9ff4efd9c7e940a3ab93e213 Mon Sep 17 00:00:00 2001 From: Jake Pullen Date: Sat, 10 Aug 2024 18:36:39 +0100 Subject: [PATCH] Added more error handling in for raw to base --- config/exit_codes.py | 4 +- pipeline/ingest.py | 10 +-- pipeline/raw_to_base.py | 181 +++++++++++++++++++++++++--------------- 3 files changed, 119 insertions(+), 76 deletions(-) diff --git a/config/exit_codes.py b/config/exit_codes.py index fea8f17..07d3e17 100644 --- a/config/exit_codes.py +++ b/config/exit_codes.py @@ -7,4 +7,6 @@ REQUESTS_ERROR = 5 BAD_REQUEST = 6 FORBIDDEN = 7 NOT_FOUND = 8 -CONFLICT = 9 \ No newline at end of file +CONFLICT = 9 +MOVE_FILE_ERROR = 10 +DUPLICATE_RESOLUTION_ERROR = 11 \ No newline at end of file diff --git a/pipeline/ingest.py b/pipeline/ingest.py index 3827df4..6b25b27 100644 --- a/pipeline/ingest.py +++ b/pipeline/ingest.py @@ -23,17 +23,9 @@ class Ingest: self.raw_data_path = config['raw_data_path'] self.headers = {'Authorization': f'Bearer {self.api_token}'} self.knowledge_cache = self.load_knowledge_cache() - self.load_additional_config() - self.fetch_and_cache_entity_data() - - def load_additional_config(self): - """ - Load additional configuration values from the config file. - """ - with open('config/config.yaml', 'r') as f: - config = yaml.safe_load(f) self.MAX_RETRIES = config['REQUESTS_MAX_RETRIES'] self.RETRY_DELAY = config['REQUESTS_RETRY_DELAY'] + self.fetch_and_cache_entity_data() def load_knowledge_cache(self) -> Dict[str, Any]: """ diff --git a/pipeline/raw_to_base.py b/pipeline/raw_to_base.py index a4e2c7a..1652b38 100644 --- a/pipeline/raw_to_base.py +++ b/pipeline/raw_to_base.py @@ -1,8 +1,10 @@ import os import json import logging +import sys from datetime import datetime -from typing import List, Dict, Any +from typing import Dict, Any +import config.exit_codes as ec import polars as pl class RawToBase: @@ -14,15 +16,13 @@ class RawToBase: self.base_data_path = config['base_data_path'] self.data = {} self.base_data = {} - logging.basicConfig(level=logging.DEBUG) self.process_entities() - + def process_entities(self): for entity in self.entities: # check the file is in the raw data path, if not skip the entity folder_path = os.path.join(self.raw_data_path, entity) folder_contents = os.listdir(folder_path) - # Check if the folder is empty if not folder_contents: logging.warning(f"The folder {folder_path} is empty skipping {entity}.") continue @@ -31,61 +31,94 @@ class RawToBase: continue self._load_existing_base_data(entity) self._combine_data(entity) - self._resolve_duplicates(entity) - self._save_base_data(entity) - self._move_raw_to_processed(entity) + if not self._resolve_duplicates(entity): + logging.error(f"entity: {entity} failed duplicate resolution.") + sys.exit(ec.DUPLICATE_RESOLUTION_ERROR) + if not self._save_base_data(entity): + logging.warning(f"Skipping processing for entity: {entity} due to failed saving base data.") + continue + if not self._move_raw_to_processed(entity): + logging.error(f"entity: {entity} has been processed, but we could not move the file out of the raw folder, please clear the raw folder for {entity}.") + sys.exit(ec.MOVE_FILE_ERROR) def _load_raw_data(self, entity): entity_path = os.path.join(self.raw_data_path, entity) self.data[entity] = [] logging.debug(f"Loading data for entity: {entity} from path: {entity_path}") - for file_name in os.listdir(entity_path): - if file_name.endswith('.json'): - file_path = os.path.join(entity_path, file_name) - logging.debug(f"Reading file: {file_path}") - try: - with open(file_path, 'r') as f: - data = json.load(f) - # Check if the data is empty - if entity == "categories": - # Check if any category group has categories - has_categories = any(group.get("categories") for group in data.get("category_groups", [])) - if not has_categories: - logging.warning(f"Received empty data for entity: {entity} in file: {file_path}, deleting file.") - os.remove(file_path) - return False - else: - if not data.get(entity, []): - logging.warning(f"Received empty data for entity: {entity} in file: {file_path}, deleting file.") - # delete the file as it is empty - os.remove(file_path) - return False - modified_data = [] - if entity == 'categories': - for group in data.get('category_groups', []): - for category in group.get('categories', []): - category['ingestion_date'] = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date() - modified_data.append(category) - else: - for record in data.get(f'{entity}', []): - if isinstance(record, dict): - record['ingestion_date'] = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date() - modified_data.append(record) - else: - modified_data.append({'record': record, 'ingestion_date': datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()}) - self.data[entity].append(modified_data) - logging.debug(f"Successfully loaded data from file: {file_path}") - except Exception as e: - logging.error(f"Failed to load data from file: {file_path}, error: {e}") - exit(1) + files = [f for f in os.listdir(entity_path) if f.endswith('.json')] + + if len(files) > 1: + logging.error(f"""More than one file found in path: {entity_path}. Skipping processing for entity: {entity}. +recommended actions is to move the newest file(s) out, re-run main.py. +Then move the files back in one at a time oldest to newest and run again for each file""") + return False + + if len(files) == 1: + file_name = files[0] + file_path = os.path.join(entity_path, file_name) + logging.debug(f"Reading file: {file_path}") + try: + with open(file_path, 'r') as f: + data = json.load(f) + except Exception as e: + logging.error(f"Failed to load data from file: {file_path}, error: {e}") + return False + + if self._is_data_empty(entity, data, file_path): + return False + + modified_data = self._add_ingestion_date(entity, data, file_name) + + self.data[entity].append(modified_data) + logging.debug(f"Successfully loaded data from file: {file_path}") return True + def _is_data_empty(self, entity, data, file_path): + logging.debug(f"Checking if data is empty for entity: {entity}") + if entity == "categories": + has_categories = any(group.get("categories") for group in data.get("category_groups", [])) + if not has_categories: + logging.warning(f"Received empty data for entity: {entity} in file: {file_path}, deleting file.") + os.remove(file_path) + return True + else: + if not data.get(entity, []): + logging.warning(f"Received empty data for entity: {entity} in file: {file_path}, deleting file.") + os.remove(file_path) + return True + logging.debug(f"Data is not empty for entity: {entity}") + return False + + def _add_ingestion_date(self, entity, data, file_name): + modified_data = [] + ingestion_date = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date() + + logging.debug(f"Adding ingestion date to data for entity: {entity}") + if entity == 'categories': + for group in data.get('category_groups', []): + for category in group.get('categories', []): + category['ingestion_date'] = ingestion_date + modified_data.append(category) + else: + for record in data.get(f'{entity}', []): + if isinstance(record, dict): + record['ingestion_date'] = ingestion_date + modified_data.append(record) + else: + modified_data.append({'record': record, 'ingestion_date': ingestion_date}) + logging.debug(f"Successfully added ingestion date to data for entity: {entity}") + return modified_data + def _load_existing_base_data(self, entity): base_path = os.path.join(self.base_data_path, f'{entity}.parquet') if os.path.exists(base_path): logging.debug(f"Loading existing base data for entity: {entity} from path: {base_path}") - self.base_data[entity] = pl.read_parquet(base_path) + try: + self.base_data[entity] = pl.read_parquet(base_path) + except Exception as e: + logging.error(f"Failed to load existing base data for entity: {entity}, error: {e}, Creating an empty DataFrame") + self.base_data[entity] = pl.DataFrame() logging.debug(f"Successfully loaded existing base data for entity: {entity}") else: self.base_data[entity] = pl.DataFrame() @@ -103,7 +136,6 @@ class RawToBase: combined_data.extend(data) new_data_df = pl.DataFrame(combined_data) - #print(new_data_df) # Ensure the unique id column is preserved unique_id = self.primary_keys[entity]['unique_id'] @@ -117,35 +149,52 @@ class RawToBase: def _resolve_duplicates(self, entity): logging.debug(f"Resolving duplicates for entity: {entity}") unique_id = self.primary_keys[entity]['unique_id'] - self.base_data[entity] = self.base_data[entity].sort(by='ingestion_date').unique(subset=unique_id, keep='first') + try: + self.base_data[entity] = self.base_data[entity].sort(by='ingestion_date').unique(subset=unique_id, keep='first') + except Exception as e: + logging.error(f"Failed to resolve duplicates for entity: {entity}, error: {e}") + return False logging.debug(f"Successfully resolved duplicates for entity: {entity}") + return True def _save_base_data(self, entity): os.makedirs(self.base_data_path, exist_ok=True) file_path = os.path.join(self.base_data_path, f'{entity}.parquet') - self.base_data[entity].write_parquet(file_path) + try: + self.base_data[entity].write_parquet(file_path) + except Exception as e: + logging.error(f"Failed to save base data for entity: {entity}, error: {e}") + return False + logging.debug(f"Saved base data for entity: {entity} to path: {file_path}") def _move_raw_to_processed(self, entity): raw_entity_path = os.path.join(self.raw_data_path, entity) processed_path = os.path.join(self.processed_data_path, entity) - # logging.debug(f"Raw entity path: {raw_entity_path}") - # logging.debug(f"Processed path: {processed_path}") - os.makedirs(processed_path, exist_ok=True) - for file_name in os.listdir(raw_entity_path): - if file_name.endswith('.json'): - raw_file_path = os.path.join(raw_entity_path, file_name) - processed_file_path = os.path.join(processed_path, file_name) - - logging.debug(f"Moving file: {raw_file_path} to {processed_file_path}") - - if os.path.exists(raw_file_path): - os.rename(raw_file_path, processed_file_path) - logging.debug(f"Moved file: {file_name}") - else: - logging.error(f"File not found: {raw_file_path}") + try: + files = [f for f in os.listdir(raw_entity_path) if f.endswith('.json')] + if len(files) != 1: + logging.error(f"Expected exactly one file in path: {raw_entity_path}, but found {len(files)}") + return False + + file_name = files[0] + raw_file_path = os.path.join(raw_entity_path, file_name) + processed_file_path = os.path.join(processed_path, file_name) + + logging.debug(f"Moving file: {raw_file_path} to {processed_file_path}") + + os.rename(raw_file_path, processed_file_path) + logging.debug(f"Moved file: {file_name} to processed") - logging.debug(f"Moved processed files for entity: {entity} to path: {processed_path}") \ No newline at end of file + except FileNotFoundError as e: + logging.error(f"File not found: {e}") + return False + except Exception as e: + logging.error(f"Failed to move file for entity: {entity}, error: {e}") + return False + + logging.debug(f"Moved processed file for entity: {entity} to path: {processed_path}") + return True \ No newline at end of file