diff --git a/.gitignore b/.gitignore index e39bb04..364b460 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ data/* .venv/* __pycache__/* */__pycache__/* -*.pbix \ No newline at end of file +*.pbix +/logs/* \ No newline at end of file diff --git a/config/__init__.py b/config/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/config.yaml b/config/config.yaml similarity index 85% rename from config.yaml rename to config/config.yaml index e67fd95..83d3833 100644 --- a/config.yaml +++ b/config/config.yaml @@ -23,4 +23,6 @@ primary_keys: raw_data_path: data/raw processed_data_path: data/processed base_data_path: data/base -warehouse_data_path: data/warehouse \ No newline at end of file +warehouse_data_path: data/warehouse +REQUESTS_MAX_RETRIES: 3 +REQUESTS_RETRY_DELAY: 5 \ No newline at end of file diff --git a/config/custom_json_logger.py b/config/custom_json_logger.py new file mode 100644 index 0000000..f0a9ab3 --- /dev/null +++ b/config/custom_json_logger.py @@ -0,0 +1,41 @@ +import datetime as dt +import json +import logging +from typing import override + +class custom_json_logger(logging.Formatter): + def __init__( + self, + *, + format_keys: dict[str,str] | None = None, + ): + super().__init__() + self.format_keys = format_keys if format_keys is not None else {} + + @override + def format(self, record: logging.LogRecord) -> str: + record_dict = self._prepare_log_dict(record) + return json.dumps(record_dict, default=str) + + def _prepare_log_dict(self, record: logging.LogRecord) -> dict: + always_fields = { + "message" : record.getMessage(), + "timestamp" : dt.datetime.fromtimestamp( + record.created, tz=dt.timezone.utc + ).isoformat(), + } + if record.exc_info is not None: + always_fields["exc_info"] = self.formatException(record.exc_info) + + if record.stack_info is not None: + always_fields["stack_info"] = self.formatStack(record.stack_info) + + message = { + key: msg_val + if (msg_val := always_fields.pop(val, None)) is not None + else getattr(record, val) + for key, val in self.format_keys.items() + } + message.update(always_fields) + return message + diff --git a/config/exit_codes.py b/config/exit_codes.py new file mode 100644 index 0000000..07d3e17 --- /dev/null +++ b/config/exit_codes.py @@ -0,0 +1,12 @@ +SUCCESS = 0 +MISSING_ENV_VARS = 1 +MISSING_CONFIG_FILE = 2 +CORRUPTED_CONFIG_FILE = 3 +UNAUTHORIZED_API_TOKEN = 4 +REQUESTS_ERROR = 5 +BAD_REQUEST = 6 +FORBIDDEN = 7 +NOT_FOUND = 8 +CONFLICT = 9 +MOVE_FILE_ERROR = 10 +DUPLICATE_RESOLUTION_ERROR = 11 \ No newline at end of file diff --git a/config/logging_config.yaml b/config/logging_config.yaml new file mode 100644 index 0000000..d424e75 --- /dev/null +++ b/config/logging_config.yaml @@ -0,0 +1,41 @@ +version: 1 +disable_existing_loggers: False +formatters: + simple: + format: "%(asctime)s - %(levelname)s - %(module)s - %(funcName)s - %(message)s" + datefmt: "%Y-%m-%d %H:%M:%S%z" + json: + "()": config.custom_json_logger.custom_json_logger + format_keys: + level: levelname + timestamp: timestamp + logger: name + module: module + function: funcName + line: lineno + message: message + thread_name: threadName +handlers: + stderr: + class: logging.StreamHandler + level: INFO + formatter: simple + stream: ext://sys.stdout + file: + class: logging.handlers.RotatingFileHandler + level: DEBUG + formatter: json + filename: logs/dpfy_log.jsonl + maxBytes: 10485760 # 10MB + backupCount: 10 + queue_handler: + class: logging.handlers.QueueHandler + handlers: + - stderr + - file + respect_handler_level: True +loggers: + root: + level: DEBUG + handlers: + - queue_handler diff --git a/main.py b/main.py index 8ebc54d..33e686b 100644 --- a/main.py +++ b/main.py @@ -2,25 +2,61 @@ import os import dotenv import logging import yaml +import sys +import atexit +import logging.config +import logging.handlers +import config.exit_codes as ec from pipeline.ingest import Ingest from pipeline.raw_to_base import RawToBase from pipeline.dimensions import DimAccounts, DimCategories, DimPayees, DimDate from pipeline.facts import FactTransactions, FactScheduledTransactions +def set_up_logging(): + try: + with open('config/logging_config.yaml', 'r') as f: + log_config = yaml.safe_load(f) + logging.config.dictConfig(log_config) + except yaml.YAMLError as e: + print(f"Error parsing logging configuration file: {e}") + log_config = {} # Initialize log_config to an empty dictionary + logging.basicConfig(level=logging.INFO) # Fallback to a basic configuration + queue_handler = logging.getHandlerByName('queue_handler') + if queue_handler is not None: + queue_handler.listener.start() + atexit.register(queue_handler.listener.stop) + +logger = logging.getLogger("data_pipeline_for_ynab") +os.makedirs('logs', exist_ok=True) +set_up_logging() + +# Load environment variables dotenv.load_dotenv() API_TOKEN = os.getenv('API_TOKEN') BUDGET_ID = os.getenv('BUDGET_ID') -logging.basicConfig(level=logging.DEBUG) -with open('config.yaml', 'r') as file: - config = yaml.safe_load(file) +def main(): + if not API_TOKEN or not BUDGET_ID: + logging.error('API_TOKEN or BUDGET_ID is not set in .env file') + sys.exit(ec.MISSING_ENV_VARS) -config['API_TOKEN'] = API_TOKEN -config['BUDGET_ID'] = BUDGET_ID + try: + with open('config/config.yaml', 'r') as file: + config = yaml.safe_load(file) + except FileNotFoundError: + logging.error('config.yaml file not found') + sys.exit(ec.MISSING_CONFIG_FILE) + except yaml.YAMLError as e: + logging.error(f'Error loading config.yaml: {e}') + sys.exit(ec.CORRUPTED_CONFIG_FILE) + + config['API_TOKEN'] = API_TOKEN + config['BUDGET_ID'] = BUDGET_ID + + logging.info('Starting data pipeline') -if __name__ == '__main__': Ingest(config) RawToBase(config) DimAccounts(config) @@ -29,3 +65,17 @@ if __name__ == '__main__': DimDate(config) FactTransactions(config) FactScheduledTransactions(config) + + logging.info('Data pipeline completed successfully') + sys.exit(ec.SUCCESS) + +if __name__ == '__main__': + try: + main() + except SystemExit as e: + exit_code = e.code + if exit_code == ec.SUCCESS: + logging.info('Program exited successfully') + else: + logging.error(f'Program exited with code {exit_code}') + raise diff --git a/pipeline/dimensions.py b/pipeline/dimensions.py index 3fed4f7..af0080b 100644 --- a/pipeline/dimensions.py +++ b/pipeline/dimensions.py @@ -21,40 +21,51 @@ class DimAccounts(Dimensions): def transform(self): # Read the parquet file into a polars DataFrame - accounts_df = pl.read_parquet(self.file_path) + try: + accounts_df = pl.read_parquet(self.file_path) + except Exception as e: + logging.error(f"Failed to read the base accounts parquet file: {e}") + return # Transform the DataFrame logging.info("Transforming the accounts DataFrame") - accounts_df = ( - accounts_df - .with_columns([ - pl.col("id").alias("account_id"), - pl.col("name").alias("account_name"), - pl.col("type").alias("account_type"), - pl.col("on_budget").alias("on_budget"), - pl.col("closed").alias("closed"), - pl.col("note").alias("note"), - pl.col("balance").alias("balance"), - pl.col("cleared_balance").alias("cleared_balance"), - pl.col("uncleared_balance").alias("uncleared_balance"), - pl.col("deleted").alias("deleted"), - ]) - .with_columns([ - pl.col("note").fill_null("unknown"), - (pl.col("balance") / 100).alias("balance"), - (pl.col("cleared_balance") / 100).alias("cleared_balance"), - (pl.col("uncleared_balance") / 100).alias("uncleared_balance"), - ]) - .drop([ - "transfer_payee_id", "direct_import_linked", "direct_import_in_error", - "last_reconciled_at", "debt_original_balance", "debt_interest_rates", - "debt_minimum_payments", "debt_escrow_amounts", "ingestion_date" - ]) - ) + try: + accounts_df = ( + accounts_df + .with_columns([ + pl.col("id").alias("account_id"), + pl.col("name").alias("account_name"), + pl.col("type").alias("account_type"), + pl.col("on_budget").alias("on_budget"), + pl.col("closed").alias("closed"), + pl.col("note").alias("note"), + pl.col("balance").alias("balance"), + pl.col("cleared_balance").alias("cleared_balance"), + pl.col("uncleared_balance").alias("uncleared_balance"), + pl.col("deleted").alias("deleted"), + ]) + .with_columns([ + pl.col("note").fill_null("unknown"), + (pl.col("balance") / 100).alias("balance"), + (pl.col("cleared_balance") / 100).alias("cleared_balance"), + (pl.col("uncleared_balance") / 100).alias("uncleared_balance"), + ]) + .drop([ + "transfer_payee_id", "direct_import_linked", "direct_import_in_error", + "last_reconciled_at", "debt_original_balance", "debt_interest_rates", + "debt_minimum_payments", "debt_escrow_amounts", "ingestion_date" + ]) + ) + except Exception as e: + logging.error(f"Failed to transform the accounts DataFrame: {e}") + return # Write the DataFrame to a new parquet file logging.info("Writing the transformed accounts DataFrame to parquet file") - accounts_df.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet') - + try: + accounts_df.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet') + except Exception as e: + logging.error(f"Failed to write the transformed accounts DataFrame to parquet file: {e}") + return class DimCategories(Dimensions): def __init__(self, config): @@ -64,36 +75,52 @@ class DimCategories(Dimensions): def transform(self): # Read the parquet file into a polars DataFrame - categories_df = pl.read_parquet(self.file_path) + try: + categories_df = pl.read_parquet(self.file_path) + except Exception as e: + logging.error(f"Failed to read the base categories parquet file: {e}") + return logging.info("Transforming the categories DataFrame") - # Select the required columns - categories_df = categories_df.select([ - 'id', - 'name', - 'category_group_name', - 'hidden', - 'note', - 'budgeted', - 'activity', - 'balance', - 'deleted' - ]) - # Rename the columns - categories_df = categories_df.with_columns(pl.col('id').alias('category_id')) - categories_df = categories_df.with_columns(pl.col('name').alias('category_name')) + try: + categories_df = categories_df.select([ + 'id', + 'name', + 'category_group_name', + 'hidden', + 'note', + 'budgeted', + 'activity', + 'balance', + 'deleted' + ]) + except Exception as e: + logging.error(f"Failed to select columns from the categories DataFrame: {e}") + return + + try: + # Rename the columns + categories_df = categories_df.with_columns(pl.col('id').alias('category_id')) + categories_df = categories_df.with_columns(pl.col('name').alias('category_name')) - # Fill null values in the note column - categories_df = categories_df.with_columns(pl.col('note').fill_null('unknown')) + # Fill null values in the note column + categories_df = categories_df.with_columns(pl.col('note').fill_null('unknown')) - # Convert the balance, budgeted, and activity columns to decimal - categories_df = categories_df.with_columns(pl.col('balance') / 100) - categories_df = categories_df.with_columns(pl.col('budgeted') / 100) - categories_df = categories_df.with_columns(pl.col('activity') / 100) + # Convert the balance, budgeted, and activity columns to decimal + categories_df = categories_df.with_columns(pl.col('balance') / 100) + categories_df = categories_df.with_columns(pl.col('budgeted') / 100) + categories_df = categories_df.with_columns(pl.col('activity') / 100) + except Exception as e: + logging.error(f"Failed to transform the categories DataFrame: {e}") + return # Write the DataFrame to a new parquet file logging.info("Writing the transformed categories DataFrame to parquet file") - categories_df.write_parquet(self.config['warehouse_data_path'] + '/categories.parquet') - + try: + categories_df.write_parquet(self.config['warehouse_data_path'] + '/categories.parquet') + except Exception as e: + logging.error(f"Failed to write the transformed categories DataFrame to parquet file: {e}") + return + class DimPayees(Dimensions): def __init__(self, config): super().__init__(config) @@ -102,22 +129,36 @@ class DimPayees(Dimensions): def transform(self): # Read the parquet file into a polars DataFrame - payees_df = pl.read_parquet(self.file_path) + try: + payees_df = pl.read_parquet(self.file_path) + except Exception as e: + logging.error(f"Failed to read the base payees parquet file: {e}") + return logging.info("Transforming the payees DataFrame") - # Select the required columns - payees_df = payees_df.select([ - 'id', - 'name', - 'deleted' - ]) - # Rename the columns - payees_df = payees_df.with_columns(pl.col('id').alias('payee_id')) - payees_df = payees_df.with_columns(pl.col('name').alias('payee_name')) + try: + payees_df = payees_df.select([ + 'id', + 'name', + 'deleted' + ]) + except Exception as e: + logging.error(f"Failed to select columns from the payees DataFrame: {e}") + return + try: + # Rename the columns + payees_df = payees_df.with_columns(pl.col('id').alias('payee_id')) + payees_df = payees_df.with_columns(pl.col('name').alias('payee_name')) + except Exception as e: + logging.error(f"Failed to rename columns in the payees DataFrame: {e}") + return # Write the DataFrame to a new parquet file logging.info("Writing the transformed payees DataFrame to parquet file") - payees_df.write_parquet(self.config['warehouse_data_path'] + '/payees.parquet') - + try: + payees_df.write_parquet(self.config['warehouse_data_path'] + '/payees.parquet') + except Exception as e: + logging.error(f"Failed to write the transformed payees DataFrame to parquet file: {e}") + return class DimDate(Dimensions): def __init__(self, config): @@ -126,20 +167,35 @@ class DimDate(Dimensions): def transform(self): # Create a DataFrame with dates from 2020-01-01 to 2030-12-31 - dates_df = pl.DataFrame({'date':pl.date_range(date(2020, 1, 1), date(2030, 12, 31), "1d", eager=True)}) - + try: + dates_df = pl.DataFrame({'date':pl.date_range(date(2020, 1, 1), date(2030, 12, 31), "1d", eager=True)}) + except Exception as e: + logging.error(f"Failed to create a DataFrame with dates: {e}") + return # Extract year, month, day, and weekday from the date column - dates_df = dates_df.with_columns([ - pl.col('date').dt.year().alias('year'), - pl.col('date').dt.month().alias('month'), - pl.col('date').dt.day().alias('day'), - pl.col('date').dt.weekday().alias('weekday') - ]) - # Create a new column to indicate if the date is a weekday or weekend - dates_df = dates_df.with_columns([ - (pl.col('weekday') < 5).alias('is_weekday') # True for weekdays (Monday to Friday), False for weekends (Saturday and Sunday) - ]) + try: + dates_df = dates_df.with_columns([ + pl.col('date').dt.year().alias('year'), + pl.col('date').dt.month().alias('month'), + pl.col('date').dt.day().alias('day'), + pl.col('date').dt.weekday().alias('weekday') + ]) + except Exception as e: + logging.error(f"Failed to extract year, month, day, and weekday from the date column: {e}") + return + try: + # Create a new column to indicate if the date is a weekday or weekend + dates_df = dates_df.with_columns([ + (pl.col('weekday') < 5).alias('is_weekday') # True for weekdays (Monday to Friday), False for weekends (Saturday and Sunday) + ]) + except Exception as e: + logging.error(f"Failed to create a new column to indicate if the date is a weekday or weekend: {e}") + return # Write the DataFrame to a new parquet file logging.info("Writing the transformed dates DataFrame to parquet file") - dates_df.write_parquet(self.config['warehouse_data_path'] + '/dates.parquet') + try: + dates_df.write_parquet(self.config['warehouse_data_path'] + '/dates.parquet') + except Exception as e: + logging.error(f"Failed to write the transformed dates DataFrame to parquet file: {e}") + return diff --git a/pipeline/facts.py b/pipeline/facts.py index 8858f8e..7611826 100644 --- a/pipeline/facts.py +++ b/pipeline/facts.py @@ -1,7 +1,6 @@ import polars as pl import logging import os -from datetime import date class Facts: def __init__(self, config): @@ -13,7 +12,6 @@ class Facts: return f"{self.base_file_path}/{file_name}" class FactTransactions(Facts): - def __init__(self, config): super().__init__(config) self.file_path = self.get_full_file_path('transactions.parquet') @@ -21,43 +19,52 @@ class FactTransactions(Facts): def transform(self): # Read the parquet file into a polars DataFrame - transactions_df = pl.read_parquet(self.file_path) - + try: + transactions_df = pl.read_parquet(self.file_path) + except FileNotFoundError: + logging.error("The transactions DataFrame does not exist") + return + # Transform the DataFrame logging.info("Transforming the transactions DataFrame") - transactions_df = ( - transactions_df - .with_columns([ - pl.col("id").alias("transaction_id"), - pl.col("date").alias("transaction_date"), - pl.col("amount").alias("transaction_amount"), - pl.col("memo").alias("transaction_memo"), - pl.col("cleared").alias("transaction_cleared"), - pl.col("approved").alias("transaction_approved"), - pl.col("flag_color").alias("transaction_flag_color"), - pl.col("account_id").alias("account_id"), - pl.col("payee_id").alias("payee_id"), - pl.col("category_id").alias("category_id"), - pl.col("transfer_account_id").alias("transfer_account_id"), - ]) - .with_columns([ - pl.col("memo").fill_null("unknown"), - (pl.col("amount") / 100).alias("transaction_amount"), - ]) - .drop([ - "transfer_transaction_id", "matched_transaction_id", "import_id", - "subtransactions", "deleted","flag_name","account_name", - "payee_name","category_name","import_payee_name","import_payee_name_original", - "debt_transaction_type","ingestion_date" - ]) - ) - + try: + transactions_df = ( + transactions_df + .with_columns([ + pl.col("id").alias("transaction_id"), + pl.col("date").alias("transaction_date"), + pl.col("amount").alias("transaction_amount"), + pl.col("memo").alias("transaction_memo"), + pl.col("cleared").alias("transaction_cleared"), + pl.col("approved").alias("transaction_approved"), + pl.col("flag_color").alias("transaction_flag_color"), + pl.col("account_id").alias("account_id"), + pl.col("payee_id").alias("payee_id"), + pl.col("category_id").alias("category_id"), + pl.col("transfer_account_id").alias("transfer_account_id"), + ]) + .with_columns([ + pl.col("memo").fill_null("unknown"), + (pl.col("amount") / 100).alias("transaction_amount"), + ]) + .drop([ + "transfer_transaction_id", "matched_transaction_id", "import_id", + "subtransactions", "deleted","flag_name","account_name", + "payee_name","category_name","import_payee_name","import_payee_name_original", + "debt_transaction_type","ingestion_date" + ]) + ) + except Exception as e: + logging.error(f"Failed to transform the transactions DataFrame: {e}") + return # Write the DataFrame to a new parquet file logging.info("Writing the transformed transactions DataFrame to parquet file") - transactions_df.write_parquet(self.config['warehouse_data_path'] + '/transactions.parquet') + try: + transactions_df.write_parquet(self.config['warehouse_data_path'] + '/transactions.parquet') + except Exception as e: + logging.error(f"Failed to write the transformed transactions DataFrame: {e}") class FactScheduledTransactions(Facts): - def __init__(self, config): super().__init__(config) self.file_path = self.get_full_file_path('scheduled_transactions.parquet') @@ -73,30 +80,37 @@ class FactScheduledTransactions(Facts): # Transform the DataFrame logging.info("Transforming the scheduled transactions DataFrame") - scheduled_transactions_df = ( - scheduled_transactions_df - .with_columns([ - pl.col("id").alias("scheduled_transaction_id"), - pl.col("date_first").alias("scheduled_transaction_first_date"), - pl.col("date_next").alias("scheduled_transaction_next_date"), - pl.col("frequency").alias("scheduled_transaction_frequency"), - pl.col("amount").alias("scheduled_transaction_amount"), - pl.col("memo").alias("scheduled_transaction_memo"), - pl.col("flag_color").alias("scheduled_transaction_flag_color"), - pl.col("account_id").alias("account_id"), - pl.col("payee_id").alias("payee_id"), - pl.col("category_id").alias("category_id"), - pl.col("transfer_account_id").alias("transfer_account_id"), - ]) - .with_columns([ - pl.col("memo").fill_null("unknown"), - (pl.col("amount") / 100).alias("scheduled_transaction_amount"), - ]) - .drop([ - "subtransactions", "deleted","flag_name","account_name", - "payee_name","category_name","ingestion_date" - ]) - ) + try: + scheduled_transactions_df = ( + scheduled_transactions_df + .with_columns([ + pl.col("id").alias("scheduled_transaction_id"), + pl.col("date_first").alias("scheduled_transaction_first_date"), + pl.col("date_next").alias("scheduled_transaction_next_date"), + pl.col("frequency").alias("scheduled_transaction_frequency"), + pl.col("amount").alias("scheduled_transaction_amount"), + pl.col("memo").alias("scheduled_transaction_memo"), + pl.col("flag_color").alias("scheduled_transaction_flag_color"), + pl.col("account_id").alias("account_id"), + pl.col("payee_id").alias("payee_id"), + pl.col("category_id").alias("category_id"), + pl.col("transfer_account_id").alias("transfer_account_id"), + ]) + .with_columns([ + pl.col("memo").fill_null("unknown"), + (pl.col("amount") / 100).alias("scheduled_transaction_amount"), + ]) + .drop([ + "subtransactions", "deleted","flag_name","account_name", + "payee_name","category_name","ingestion_date" + ]) + ) + except Exception as e: + logging.error(f"Failed to transform the scheduled transactions DataFrame: {e}") + return # Write the DataFrame to a new parquet file logging.info("Writing the transformed scheduled transactions DataFrame to parquet file") - scheduled_transactions_df.write_parquet(self.config['warehouse_data_path'] + '/scheduled_transactions.parquet') + try: + scheduled_transactions_df.write_parquet(self.config['warehouse_data_path'] + '/scheduled_transactions.parquet') + except Exception as e: + logging.error(f"Failed to write the transformed scheduled transactions DataFrame: {e}") diff --git a/pipeline/ingest.py b/pipeline/ingest.py index e49e4bf..6b25b27 100644 --- a/pipeline/ingest.py +++ b/pipeline/ingest.py @@ -3,9 +3,14 @@ import time import json import logging import requests +import sys +import yaml from typing import Dict, Any +import config.exit_codes as ec class Ingest: + + def __init__(self, config: Dict[str, Any]): """ Initialize the Ingest class with the provided configuration. @@ -18,6 +23,8 @@ class Ingest: self.raw_data_path = config['raw_data_path'] self.headers = {'Authorization': f'Bearer {self.api_token}'} self.knowledge_cache = self.load_knowledge_cache() + self.MAX_RETRIES = config['REQUESTS_MAX_RETRIES'] + self.RETRY_DELAY = config['REQUESTS_RETRY_DELAY'] self.fetch_and_cache_entity_data() def load_knowledge_cache(self) -> Dict[str, Any]: @@ -38,8 +45,13 @@ class Ingest: if not os.path.exists(directory): os.makedirs(directory) entity_file = f'{directory}/{current_time}.json' - with open(entity_file, 'w') as f: - json.dump(data, f, indent=4) + logging.info(f"Saving {entity} data to {entity_file}") + try: + with open(entity_file, 'w') as f: + json.dump(data, f, indent=4) + except Exception as e: + logging.error(f"Error saving {entity} data: {e}") + def update_server_knowledge_cache(self, entity: str, server_knowledge: Any): """ @@ -49,8 +61,7 @@ class Ingest: with open(self.knowledge_file, 'r') as f: knowledge_cache = json.load(f) except FileNotFoundError: - # If the file does not exist, create an empty cache - # also create the file so we can save to it later + logging.info(f"Knowledge file not found. Creating a new one at {self.knowledge_file}. This is normal for the first run.") os.makedirs(os.path.dirname(self.knowledge_file), exist_ok=True) knowledge_cache = {} @@ -71,41 +82,74 @@ class Ingest: if remaining_requests < 20: logging.warning("Approaching rate limit. Consider pausing further requests.") # Implement pause or delay logic here if necessary + if remaining_requests == 1: + logging.error("Rate limit exceeded. ending requests here and moving on with what we have.") + return True #returning True here to break out of any more ingestions + else: logging.warning("X-Rate-Limit header is missing.") + + def handle_response(self, response) -> bool: + if response.status_code == 400: + logging.error("Bad request. The request could not be understood by the API due to malformed syntax or validation errors.") + sys.exit(ec.BAD_REQUEST) + elif response.status_code == 401: + logging.error("Unauthorized. Please check your API token.") + sys.exit(ec.UNAUTHORIZED_API_TOKEN) + elif response.status_code == 403: + logging.error("Forbidden. Access is denied.") + sys.exit(ec.FORBIDDEN) + elif response.status_code == 404: + logging.error("Not found. The specified URI does not exist.") + sys.exit(ec.NOT_FOUND) + elif response.status_code == 409: + logging.error("Conflict. The resource cannot be saved due to a conflict.") + sys.exit(ec.CONFLICT) + elif response.status_code == 429: + logging.error("Too many requests. You have made too many requests in a short amount of time.") + return True + elif response.status_code == 500: + logging.error("Internal server error. The API experienced an unexpected error.") + return True + elif response.status_code == 503: + logging.error("Service unavailable. The API is temporarily disabled or a request timeout occurred.") + return True + else: + response.raise_for_status() + return False def fetch_and_cache_entity_data(self): """ Fetch and cache data for all entities. """ for entity in self.entities: - # if we already have files in the raw data folder, we need to skip that entity file_path = f'data/raw/{entity}' if os.path.exists(file_path) and os.listdir(file_path): - logging.warning(f"Skipping entity: {entity} as the raw data folder is not empty.") - continue + logging.warning(f"Raw data exists for {entity} processing any raw data we already have.") + break # break here instead of continue as we dont want to update our server knowledge cache and potentially miss data. + last_knowledge = self.knowledge_cache.get(entity, 0) - logging.debug(f'Last Knowledge of {entity.capitalize()}: {last_knowledge}') - url = f'{self.base_url}/{self.budget_id}/{entity}' - if last_knowledge: - logging.info(f'Fetching {entity} data since last knowledge: {last_knowledge}') - url = url + f'?last_knowledge_of_server={last_knowledge}' - - response = requests.get(url, headers=self.headers) - if response.status_code == 401: - logging.error("Unauthorized. Please check your API token.") - break - - self.check_rate_limit(response) - - if response.status_code == 429: - logging.error("Rate limit exceeded. Pausing until the limit is reset.") - # Implement pause until the limit reset logic here - break + #logging.debug(f'Last Knowledge of {entity}: {last_knowledge}') + logging.info(f'Fetching {entity} data since last knowledge: {last_knowledge}') + url = f'{self.base_url}/{self.budget_id}/{entity}?last_knowledge_of_server={last_knowledge}' + + for attempt in range(self.MAX_RETRIES): + try: + response = requests.get(url, headers=self.headers) + should_retry = self.handle_response(response) + if not should_retry: + break # Exit the loop if the request is successful + except requests.exceptions.RequestException as e: + logging.error(f"Error fetching {entity} data (attempt {attempt + 1}/{self.MAX_RETRIES}): {e}") + if attempt < self.MAX_RETRIES - 1: + time.sleep(self.RETRY_DELAY) # Wait before retrying + else: + logging.error("Max retries reached. Exiting.") + sys.exit(ec.REQUESTS_ERROR) data = response.json() server_knowledge = data['data'].get('server_knowledge') - logging.debug(f'{entity.capitalize()} Server Knowledge: {server_knowledge}') + logging.debug(f'{entity} new server knowledge: {server_knowledge}') if server_knowledge is not None and server_knowledge != last_knowledge: self.update_server_knowledge_cache(entity, server_knowledge) @@ -113,4 +157,7 @@ class Ingest: entity_data.pop('server_knowledge', None) self.save_entity_data_to_raw(entity, entity_data) else: - logging.info(f"No new data for {entity}. Skipping cache update.") \ No newline at end of file + logging.info(f"No new data for {entity}. Skipping cache update.") + + if self.check_rate_limit(response): + break # break out here and continue processing the data we have. diff --git a/pipeline/raw_to_base.py b/pipeline/raw_to_base.py index a4e2c7a..1e2072a 100644 --- a/pipeline/raw_to_base.py +++ b/pipeline/raw_to_base.py @@ -1,8 +1,10 @@ import os import json import logging +import sys from datetime import datetime -from typing import List, Dict, Any +from typing import Dict, Any +import config.exit_codes as ec import polars as pl class RawToBase: @@ -14,15 +16,13 @@ class RawToBase: self.base_data_path = config['base_data_path'] self.data = {} self.base_data = {} - logging.basicConfig(level=logging.DEBUG) self.process_entities() - + def process_entities(self): for entity in self.entities: # check the file is in the raw data path, if not skip the entity folder_path = os.path.join(self.raw_data_path, entity) folder_contents = os.listdir(folder_path) - # Check if the folder is empty if not folder_contents: logging.warning(f"The folder {folder_path} is empty skipping {entity}.") continue @@ -31,61 +31,94 @@ class RawToBase: continue self._load_existing_base_data(entity) self._combine_data(entity) - self._resolve_duplicates(entity) - self._save_base_data(entity) - self._move_raw_to_processed(entity) + if not self._resolve_duplicates(entity): + logging.error(f"entity: {entity} failed duplicate resolution.") + sys.exit(ec.DUPLICATE_RESOLUTION_ERROR) + if not self._save_base_data(entity): + logging.error(f"Skipping processing for entity: {entity} due to failed saving base data.") + continue + if not self._move_raw_to_processed(entity): + logging.error(f"entity: {entity} has been processed, but we could not move the file out of the raw folder, please clear the raw folder for {entity}.") + sys.exit(ec.MOVE_FILE_ERROR) def _load_raw_data(self, entity): entity_path = os.path.join(self.raw_data_path, entity) self.data[entity] = [] logging.debug(f"Loading data for entity: {entity} from path: {entity_path}") - for file_name in os.listdir(entity_path): - if file_name.endswith('.json'): - file_path = os.path.join(entity_path, file_name) - logging.debug(f"Reading file: {file_path}") - try: - with open(file_path, 'r') as f: - data = json.load(f) - # Check if the data is empty - if entity == "categories": - # Check if any category group has categories - has_categories = any(group.get("categories") for group in data.get("category_groups", [])) - if not has_categories: - logging.warning(f"Received empty data for entity: {entity} in file: {file_path}, deleting file.") - os.remove(file_path) - return False - else: - if not data.get(entity, []): - logging.warning(f"Received empty data for entity: {entity} in file: {file_path}, deleting file.") - # delete the file as it is empty - os.remove(file_path) - return False - modified_data = [] - if entity == 'categories': - for group in data.get('category_groups', []): - for category in group.get('categories', []): - category['ingestion_date'] = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date() - modified_data.append(category) - else: - for record in data.get(f'{entity}', []): - if isinstance(record, dict): - record['ingestion_date'] = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date() - modified_data.append(record) - else: - modified_data.append({'record': record, 'ingestion_date': datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()}) - self.data[entity].append(modified_data) - logging.debug(f"Successfully loaded data from file: {file_path}") - except Exception as e: - logging.error(f"Failed to load data from file: {file_path}, error: {e}") - exit(1) + files = [f for f in os.listdir(entity_path) if f.endswith('.json')] + + if len(files) > 1: + logging.error(f"""More than one file found in path: {entity_path}. Skipping processing for entity: {entity}. +recommended actions is to move the newest file(s) out, re-run main.py. +Then move the files back in one at a time oldest to newest and run again for each file""") + return False + + if len(files) == 1: + file_name = files[0] + file_path = os.path.join(entity_path, file_name) + logging.debug(f"Reading file: {file_path}") + try: + with open(file_path, 'r') as f: + data = json.load(f) + except Exception as e: + logging.error(f"Failed to load data from file: {file_path}, error: {e}") + return False + + if self._is_data_empty(entity, data, file_path): + return False + + modified_data = self._add_ingestion_date(entity, data, file_name) + + self.data[entity].append(modified_data) + logging.debug(f"Successfully loaded data from file: {file_path}") return True + def _is_data_empty(self, entity, data, file_path): + logging.debug(f"Checking if data is empty for entity: {entity}") + if entity == "categories": + has_categories = any(group.get("categories") for group in data.get("category_groups", [])) + if not has_categories: + logging.warning(f"Received empty data for entity: {entity} in file: {file_path}, deleting file.") + os.remove(file_path) + return True + else: + if not data.get(entity, []): + logging.warning(f"Received empty data for entity: {entity} in file: {file_path}, deleting file.") + os.remove(file_path) + return True + logging.debug(f"Data is not empty for entity: {entity}") + return False + + def _add_ingestion_date(self, entity, data, file_name): + modified_data = [] + ingestion_date = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date() + + logging.debug(f"Adding ingestion date to data for entity: {entity}") + if entity == 'categories': + for group in data.get('category_groups', []): + for category in group.get('categories', []): + category['ingestion_date'] = ingestion_date + modified_data.append(category) + else: + for record in data.get(f'{entity}', []): + if isinstance(record, dict): + record['ingestion_date'] = ingestion_date + modified_data.append(record) + else: + modified_data.append({'record': record, 'ingestion_date': ingestion_date}) + logging.debug(f"Successfully added ingestion date to data for entity: {entity}") + return modified_data + def _load_existing_base_data(self, entity): base_path = os.path.join(self.base_data_path, f'{entity}.parquet') if os.path.exists(base_path): logging.debug(f"Loading existing base data for entity: {entity} from path: {base_path}") - self.base_data[entity] = pl.read_parquet(base_path) + try: + self.base_data[entity] = pl.read_parquet(base_path) + except Exception as e: + logging.error(f"Failed to load existing base data for entity: {entity}, error: {e}, Creating an empty DataFrame") + self.base_data[entity] = pl.DataFrame() logging.debug(f"Successfully loaded existing base data for entity: {entity}") else: self.base_data[entity] = pl.DataFrame() @@ -103,7 +136,6 @@ class RawToBase: combined_data.extend(data) new_data_df = pl.DataFrame(combined_data) - #print(new_data_df) # Ensure the unique id column is preserved unique_id = self.primary_keys[entity]['unique_id'] @@ -117,35 +149,52 @@ class RawToBase: def _resolve_duplicates(self, entity): logging.debug(f"Resolving duplicates for entity: {entity}") unique_id = self.primary_keys[entity]['unique_id'] - self.base_data[entity] = self.base_data[entity].sort(by='ingestion_date').unique(subset=unique_id, keep='first') + try: + self.base_data[entity] = self.base_data[entity].sort(by='ingestion_date').unique(subset=unique_id, keep='first') + except Exception as e: + logging.error(f"Failed to resolve duplicates for entity: {entity}, error: {e}") + return False logging.debug(f"Successfully resolved duplicates for entity: {entity}") + return True def _save_base_data(self, entity): os.makedirs(self.base_data_path, exist_ok=True) file_path = os.path.join(self.base_data_path, f'{entity}.parquet') - self.base_data[entity].write_parquet(file_path) + try: + self.base_data[entity].write_parquet(file_path) + except Exception as e: + logging.error(f"Failed to save base data for entity: {entity}, error: {e}") + return False logging.debug(f"Saved base data for entity: {entity} to path: {file_path}") - + return True + def _move_raw_to_processed(self, entity): raw_entity_path = os.path.join(self.raw_data_path, entity) processed_path = os.path.join(self.processed_data_path, entity) - # logging.debug(f"Raw entity path: {raw_entity_path}") - # logging.debug(f"Processed path: {processed_path}") - os.makedirs(processed_path, exist_ok=True) - for file_name in os.listdir(raw_entity_path): - if file_name.endswith('.json'): - raw_file_path = os.path.join(raw_entity_path, file_name) - processed_file_path = os.path.join(processed_path, file_name) - - logging.debug(f"Moving file: {raw_file_path} to {processed_file_path}") - - if os.path.exists(raw_file_path): - os.rename(raw_file_path, processed_file_path) - logging.debug(f"Moved file: {file_name}") - else: - logging.error(f"File not found: {raw_file_path}") + try: + files = [f for f in os.listdir(raw_entity_path) if f.endswith('.json')] + if len(files) != 1: + logging.error(f"Expected exactly one file in path: {raw_entity_path}, but found {len(files)}") + return False + + file_name = files[0] + raw_file_path = os.path.join(raw_entity_path, file_name) + processed_file_path = os.path.join(processed_path, file_name) + + logging.debug(f"Moving file: {raw_file_path} to {processed_file_path}") + + os.rename(raw_file_path, processed_file_path) + logging.debug(f"Moved file: {file_name} to processed") - logging.debug(f"Moved processed files for entity: {entity} to path: {processed_path}") \ No newline at end of file + except FileNotFoundError as e: + logging.error(f"File not found: {e}") + return False + except Exception as e: + logging.error(f"Failed to move file for entity: {entity}, error: {e}") + return False + + logging.debug(f"Moved processed file for entity: {entity} to path: {processed_path}") + return True \ No newline at end of file