Merge pull request #1 from Jake-Pullen/feature/better_error_handling

Feature/better error handling
This commit is contained in:
Jake-Pullen
2024-08-10 19:48:22 +01:00
committed by GitHub
11 changed files with 550 additions and 237 deletions
+1
View File
@@ -7,3 +7,4 @@ data/*
__pycache__/* __pycache__/*
*/__pycache__/* */__pycache__/*
*.pbix *.pbix
/logs/*
View File
+2
View File
@@ -24,3 +24,5 @@ raw_data_path: data/raw
processed_data_path: data/processed processed_data_path: data/processed
base_data_path: data/base base_data_path: data/base
warehouse_data_path: data/warehouse warehouse_data_path: data/warehouse
REQUESTS_MAX_RETRIES: 3
REQUESTS_RETRY_DELAY: 5
+41
View File
@@ -0,0 +1,41 @@
import datetime as dt
import json
import logging
from typing import override
class custom_json_logger(logging.Formatter):
def __init__(
self,
*,
format_keys: dict[str,str] | None = None,
):
super().__init__()
self.format_keys = format_keys if format_keys is not None else {}
@override
def format(self, record: logging.LogRecord) -> str:
record_dict = self._prepare_log_dict(record)
return json.dumps(record_dict, default=str)
def _prepare_log_dict(self, record: logging.LogRecord) -> dict:
always_fields = {
"message" : record.getMessage(),
"timestamp" : dt.datetime.fromtimestamp(
record.created, tz=dt.timezone.utc
).isoformat(),
}
if record.exc_info is not None:
always_fields["exc_info"] = self.formatException(record.exc_info)
if record.stack_info is not None:
always_fields["stack_info"] = self.formatStack(record.stack_info)
message = {
key: msg_val
if (msg_val := always_fields.pop(val, None)) is not None
else getattr(record, val)
for key, val in self.format_keys.items()
}
message.update(always_fields)
return message
+12
View File
@@ -0,0 +1,12 @@
SUCCESS = 0
MISSING_ENV_VARS = 1
MISSING_CONFIG_FILE = 2
CORRUPTED_CONFIG_FILE = 3
UNAUTHORIZED_API_TOKEN = 4
REQUESTS_ERROR = 5
BAD_REQUEST = 6
FORBIDDEN = 7
NOT_FOUND = 8
CONFLICT = 9
MOVE_FILE_ERROR = 10
DUPLICATE_RESOLUTION_ERROR = 11
+41
View File
@@ -0,0 +1,41 @@
version: 1
disable_existing_loggers: False
formatters:
simple:
format: "%(asctime)s - %(levelname)s - %(module)s - %(funcName)s - %(message)s"
datefmt: "%Y-%m-%d %H:%M:%S%z"
json:
"()": config.custom_json_logger.custom_json_logger
format_keys:
level: levelname
timestamp: timestamp
logger: name
module: module
function: funcName
line: lineno
message: message
thread_name: threadName
handlers:
stderr:
class: logging.StreamHandler
level: INFO
formatter: simple
stream: ext://sys.stdout
file:
class: logging.handlers.RotatingFileHandler
level: DEBUG
formatter: json
filename: logs/dpfy_log.jsonl
maxBytes: 10485760 # 10MB
backupCount: 10
queue_handler:
class: logging.handlers.QueueHandler
handlers:
- stderr
- file
respect_handler_level: True
loggers:
root:
level: DEBUG
handlers:
- queue_handler
+56 -6
View File
@@ -2,25 +2,61 @@ import os
import dotenv import dotenv
import logging import logging
import yaml import yaml
import sys
import atexit
import logging.config
import logging.handlers
import config.exit_codes as ec
from pipeline.ingest import Ingest from pipeline.ingest import Ingest
from pipeline.raw_to_base import RawToBase from pipeline.raw_to_base import RawToBase
from pipeline.dimensions import DimAccounts, DimCategories, DimPayees, DimDate from pipeline.dimensions import DimAccounts, DimCategories, DimPayees, DimDate
from pipeline.facts import FactTransactions, FactScheduledTransactions from pipeline.facts import FactTransactions, FactScheduledTransactions
def set_up_logging():
try:
with open('config/logging_config.yaml', 'r') as f:
log_config = yaml.safe_load(f)
logging.config.dictConfig(log_config)
except yaml.YAMLError as e:
print(f"Error parsing logging configuration file: {e}")
log_config = {} # Initialize log_config to an empty dictionary
logging.basicConfig(level=logging.INFO) # Fallback to a basic configuration
queue_handler = logging.getHandlerByName('queue_handler')
if queue_handler is not None:
queue_handler.listener.start()
atexit.register(queue_handler.listener.stop)
logger = logging.getLogger("data_pipeline_for_ynab")
os.makedirs('logs', exist_ok=True)
set_up_logging()
# Load environment variables
dotenv.load_dotenv() dotenv.load_dotenv()
API_TOKEN = os.getenv('API_TOKEN') API_TOKEN = os.getenv('API_TOKEN')
BUDGET_ID = os.getenv('BUDGET_ID') BUDGET_ID = os.getenv('BUDGET_ID')
logging.basicConfig(level=logging.DEBUG)
with open('config.yaml', 'r') as file: def main():
config = yaml.safe_load(file) if not API_TOKEN or not BUDGET_ID:
logging.error('API_TOKEN or BUDGET_ID is not set in .env file')
sys.exit(ec.MISSING_ENV_VARS)
config['API_TOKEN'] = API_TOKEN try:
config['BUDGET_ID'] = BUDGET_ID with open('config/config.yaml', 'r') as file:
config = yaml.safe_load(file)
except FileNotFoundError:
logging.error('config.yaml file not found')
sys.exit(ec.MISSING_CONFIG_FILE)
except yaml.YAMLError as e:
logging.error(f'Error loading config.yaml: {e}')
sys.exit(ec.CORRUPTED_CONFIG_FILE)
config['API_TOKEN'] = API_TOKEN
config['BUDGET_ID'] = BUDGET_ID
logging.info('Starting data pipeline')
if __name__ == '__main__':
Ingest(config) Ingest(config)
RawToBase(config) RawToBase(config)
DimAccounts(config) DimAccounts(config)
@@ -29,3 +65,17 @@ if __name__ == '__main__':
DimDate(config) DimDate(config)
FactTransactions(config) FactTransactions(config)
FactScheduledTransactions(config) FactScheduledTransactions(config)
logging.info('Data pipeline completed successfully')
sys.exit(ec.SUCCESS)
if __name__ == '__main__':
try:
main()
except SystemExit as e:
exit_code = e.code
if exit_code == ec.SUCCESS:
logging.info('Program exited successfully')
else:
logging.error(f'Program exited with code {exit_code}')
raise
+133 -77
View File
@@ -21,40 +21,51 @@ class DimAccounts(Dimensions):
def transform(self): def transform(self):
# Read the parquet file into a polars DataFrame # Read the parquet file into a polars DataFrame
accounts_df = pl.read_parquet(self.file_path) try:
accounts_df = pl.read_parquet(self.file_path)
except Exception as e:
logging.error(f"Failed to read the base accounts parquet file: {e}")
return
# Transform the DataFrame # Transform the DataFrame
logging.info("Transforming the accounts DataFrame") logging.info("Transforming the accounts DataFrame")
accounts_df = ( try:
accounts_df accounts_df = (
.with_columns([ accounts_df
pl.col("id").alias("account_id"), .with_columns([
pl.col("name").alias("account_name"), pl.col("id").alias("account_id"),
pl.col("type").alias("account_type"), pl.col("name").alias("account_name"),
pl.col("on_budget").alias("on_budget"), pl.col("type").alias("account_type"),
pl.col("closed").alias("closed"), pl.col("on_budget").alias("on_budget"),
pl.col("note").alias("note"), pl.col("closed").alias("closed"),
pl.col("balance").alias("balance"), pl.col("note").alias("note"),
pl.col("cleared_balance").alias("cleared_balance"), pl.col("balance").alias("balance"),
pl.col("uncleared_balance").alias("uncleared_balance"), pl.col("cleared_balance").alias("cleared_balance"),
pl.col("deleted").alias("deleted"), pl.col("uncleared_balance").alias("uncleared_balance"),
]) pl.col("deleted").alias("deleted"),
.with_columns([ ])
pl.col("note").fill_null("unknown"), .with_columns([
(pl.col("balance") / 100).alias("balance"), pl.col("note").fill_null("unknown"),
(pl.col("cleared_balance") / 100).alias("cleared_balance"), (pl.col("balance") / 100).alias("balance"),
(pl.col("uncleared_balance") / 100).alias("uncleared_balance"), (pl.col("cleared_balance") / 100).alias("cleared_balance"),
]) (pl.col("uncleared_balance") / 100).alias("uncleared_balance"),
.drop([ ])
"transfer_payee_id", "direct_import_linked", "direct_import_in_error", .drop([
"last_reconciled_at", "debt_original_balance", "debt_interest_rates", "transfer_payee_id", "direct_import_linked", "direct_import_in_error",
"debt_minimum_payments", "debt_escrow_amounts", "ingestion_date" "last_reconciled_at", "debt_original_balance", "debt_interest_rates",
]) "debt_minimum_payments", "debt_escrow_amounts", "ingestion_date"
) ])
)
except Exception as e:
logging.error(f"Failed to transform the accounts DataFrame: {e}")
return
# Write the DataFrame to a new parquet file # Write the DataFrame to a new parquet file
logging.info("Writing the transformed accounts DataFrame to parquet file") logging.info("Writing the transformed accounts DataFrame to parquet file")
accounts_df.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet') try:
accounts_df.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet')
except Exception as e:
logging.error(f"Failed to write the transformed accounts DataFrame to parquet file: {e}")
return
class DimCategories(Dimensions): class DimCategories(Dimensions):
def __init__(self, config): def __init__(self, config):
@@ -64,35 +75,51 @@ class DimCategories(Dimensions):
def transform(self): def transform(self):
# Read the parquet file into a polars DataFrame # Read the parquet file into a polars DataFrame
categories_df = pl.read_parquet(self.file_path) try:
categories_df = pl.read_parquet(self.file_path)
except Exception as e:
logging.error(f"Failed to read the base categories parquet file: {e}")
return
logging.info("Transforming the categories DataFrame") logging.info("Transforming the categories DataFrame")
# Select the required columns try:
categories_df = categories_df.select([ categories_df = categories_df.select([
'id', 'id',
'name', 'name',
'category_group_name', 'category_group_name',
'hidden', 'hidden',
'note', 'note',
'budgeted', 'budgeted',
'activity', 'activity',
'balance', 'balance',
'deleted' 'deleted'
]) ])
# Rename the columns except Exception as e:
categories_df = categories_df.with_columns(pl.col('id').alias('category_id')) logging.error(f"Failed to select columns from the categories DataFrame: {e}")
categories_df = categories_df.with_columns(pl.col('name').alias('category_name')) return
# Fill null values in the note column try:
categories_df = categories_df.with_columns(pl.col('note').fill_null('unknown')) # Rename the columns
categories_df = categories_df.with_columns(pl.col('id').alias('category_id'))
categories_df = categories_df.with_columns(pl.col('name').alias('category_name'))
# Convert the balance, budgeted, and activity columns to decimal # Fill null values in the note column
categories_df = categories_df.with_columns(pl.col('balance') / 100) categories_df = categories_df.with_columns(pl.col('note').fill_null('unknown'))
categories_df = categories_df.with_columns(pl.col('budgeted') / 100)
categories_df = categories_df.with_columns(pl.col('activity') / 100) # Convert the balance, budgeted, and activity columns to decimal
categories_df = categories_df.with_columns(pl.col('balance') / 100)
categories_df = categories_df.with_columns(pl.col('budgeted') / 100)
categories_df = categories_df.with_columns(pl.col('activity') / 100)
except Exception as e:
logging.error(f"Failed to transform the categories DataFrame: {e}")
return
# Write the DataFrame to a new parquet file # Write the DataFrame to a new parquet file
logging.info("Writing the transformed categories DataFrame to parquet file") logging.info("Writing the transformed categories DataFrame to parquet file")
categories_df.write_parquet(self.config['warehouse_data_path'] + '/categories.parquet') try:
categories_df.write_parquet(self.config['warehouse_data_path'] + '/categories.parquet')
except Exception as e:
logging.error(f"Failed to write the transformed categories DataFrame to parquet file: {e}")
return
class DimPayees(Dimensions): class DimPayees(Dimensions):
def __init__(self, config): def __init__(self, config):
@@ -102,22 +129,36 @@ class DimPayees(Dimensions):
def transform(self): def transform(self):
# Read the parquet file into a polars DataFrame # Read the parquet file into a polars DataFrame
payees_df = pl.read_parquet(self.file_path) try:
payees_df = pl.read_parquet(self.file_path)
except Exception as e:
logging.error(f"Failed to read the base payees parquet file: {e}")
return
logging.info("Transforming the payees DataFrame") logging.info("Transforming the payees DataFrame")
# Select the required columns try:
payees_df = payees_df.select([ payees_df = payees_df.select([
'id', 'id',
'name', 'name',
'deleted' 'deleted'
]) ])
# Rename the columns except Exception as e:
payees_df = payees_df.with_columns(pl.col('id').alias('payee_id')) logging.error(f"Failed to select columns from the payees DataFrame: {e}")
payees_df = payees_df.with_columns(pl.col('name').alias('payee_name')) return
try:
# Rename the columns
payees_df = payees_df.with_columns(pl.col('id').alias('payee_id'))
payees_df = payees_df.with_columns(pl.col('name').alias('payee_name'))
except Exception as e:
logging.error(f"Failed to rename columns in the payees DataFrame: {e}")
return
# Write the DataFrame to a new parquet file # Write the DataFrame to a new parquet file
logging.info("Writing the transformed payees DataFrame to parquet file") logging.info("Writing the transformed payees DataFrame to parquet file")
payees_df.write_parquet(self.config['warehouse_data_path'] + '/payees.parquet') try:
payees_df.write_parquet(self.config['warehouse_data_path'] + '/payees.parquet')
except Exception as e:
logging.error(f"Failed to write the transformed payees DataFrame to parquet file: {e}")
return
class DimDate(Dimensions): class DimDate(Dimensions):
def __init__(self, config): def __init__(self, config):
@@ -126,20 +167,35 @@ class DimDate(Dimensions):
def transform(self): def transform(self):
# Create a DataFrame with dates from 2020-01-01 to 2030-12-31 # Create a DataFrame with dates from 2020-01-01 to 2030-12-31
dates_df = pl.DataFrame({'date':pl.date_range(date(2020, 1, 1), date(2030, 12, 31), "1d", eager=True)}) try:
dates_df = pl.DataFrame({'date':pl.date_range(date(2020, 1, 1), date(2030, 12, 31), "1d", eager=True)})
except Exception as e:
logging.error(f"Failed to create a DataFrame with dates: {e}")
return
# Extract year, month, day, and weekday from the date column # Extract year, month, day, and weekday from the date column
dates_df = dates_df.with_columns([ try:
pl.col('date').dt.year().alias('year'), dates_df = dates_df.with_columns([
pl.col('date').dt.month().alias('month'), pl.col('date').dt.year().alias('year'),
pl.col('date').dt.day().alias('day'), pl.col('date').dt.month().alias('month'),
pl.col('date').dt.weekday().alias('weekday') pl.col('date').dt.day().alias('day'),
]) pl.col('date').dt.weekday().alias('weekday')
# Create a new column to indicate if the date is a weekday or weekend ])
dates_df = dates_df.with_columns([ except Exception as e:
(pl.col('weekday') < 5).alias('is_weekday') # True for weekdays (Monday to Friday), False for weekends (Saturday and Sunday) logging.error(f"Failed to extract year, month, day, and weekday from the date column: {e}")
]) return
try:
# Create a new column to indicate if the date is a weekday or weekend
dates_df = dates_df.with_columns([
(pl.col('weekday') < 5).alias('is_weekday') # True for weekdays (Monday to Friday), False for weekends (Saturday and Sunday)
])
except Exception as e:
logging.error(f"Failed to create a new column to indicate if the date is a weekday or weekend: {e}")
return
# Write the DataFrame to a new parquet file # Write the DataFrame to a new parquet file
logging.info("Writing the transformed dates DataFrame to parquet file") logging.info("Writing the transformed dates DataFrame to parquet file")
dates_df.write_parquet(self.config['warehouse_data_path'] + '/dates.parquet') try:
dates_df.write_parquet(self.config['warehouse_data_path'] + '/dates.parquet')
except Exception as e:
logging.error(f"Failed to write the transformed dates DataFrame to parquet file: {e}")
return
+71 -57
View File
@@ -1,7 +1,6 @@
import polars as pl import polars as pl
import logging import logging
import os import os
from datetime import date
class Facts: class Facts:
def __init__(self, config): def __init__(self, config):
@@ -13,7 +12,6 @@ class Facts:
return f"{self.base_file_path}/{file_name}" return f"{self.base_file_path}/{file_name}"
class FactTransactions(Facts): class FactTransactions(Facts):
def __init__(self, config): def __init__(self, config):
super().__init__(config) super().__init__(config)
self.file_path = self.get_full_file_path('transactions.parquet') self.file_path = self.get_full_file_path('transactions.parquet')
@@ -21,43 +19,52 @@ class FactTransactions(Facts):
def transform(self): def transform(self):
# Read the parquet file into a polars DataFrame # Read the parquet file into a polars DataFrame
transactions_df = pl.read_parquet(self.file_path) try:
transactions_df = pl.read_parquet(self.file_path)
except FileNotFoundError:
logging.error("The transactions DataFrame does not exist")
return
# Transform the DataFrame # Transform the DataFrame
logging.info("Transforming the transactions DataFrame") logging.info("Transforming the transactions DataFrame")
transactions_df = ( try:
transactions_df transactions_df = (
.with_columns([ transactions_df
pl.col("id").alias("transaction_id"), .with_columns([
pl.col("date").alias("transaction_date"), pl.col("id").alias("transaction_id"),
pl.col("amount").alias("transaction_amount"), pl.col("date").alias("transaction_date"),
pl.col("memo").alias("transaction_memo"), pl.col("amount").alias("transaction_amount"),
pl.col("cleared").alias("transaction_cleared"), pl.col("memo").alias("transaction_memo"),
pl.col("approved").alias("transaction_approved"), pl.col("cleared").alias("transaction_cleared"),
pl.col("flag_color").alias("transaction_flag_color"), pl.col("approved").alias("transaction_approved"),
pl.col("account_id").alias("account_id"), pl.col("flag_color").alias("transaction_flag_color"),
pl.col("payee_id").alias("payee_id"), pl.col("account_id").alias("account_id"),
pl.col("category_id").alias("category_id"), pl.col("payee_id").alias("payee_id"),
pl.col("transfer_account_id").alias("transfer_account_id"), pl.col("category_id").alias("category_id"),
]) pl.col("transfer_account_id").alias("transfer_account_id"),
.with_columns([ ])
pl.col("memo").fill_null("unknown"), .with_columns([
(pl.col("amount") / 100).alias("transaction_amount"), pl.col("memo").fill_null("unknown"),
]) (pl.col("amount") / 100).alias("transaction_amount"),
.drop([ ])
"transfer_transaction_id", "matched_transaction_id", "import_id", .drop([
"subtransactions", "deleted","flag_name","account_name", "transfer_transaction_id", "matched_transaction_id", "import_id",
"payee_name","category_name","import_payee_name","import_payee_name_original", "subtransactions", "deleted","flag_name","account_name",
"debt_transaction_type","ingestion_date" "payee_name","category_name","import_payee_name","import_payee_name_original",
]) "debt_transaction_type","ingestion_date"
) ])
)
except Exception as e:
logging.error(f"Failed to transform the transactions DataFrame: {e}")
return
# Write the DataFrame to a new parquet file # Write the DataFrame to a new parquet file
logging.info("Writing the transformed transactions DataFrame to parquet file") logging.info("Writing the transformed transactions DataFrame to parquet file")
transactions_df.write_parquet(self.config['warehouse_data_path'] + '/transactions.parquet') try:
transactions_df.write_parquet(self.config['warehouse_data_path'] + '/transactions.parquet')
except Exception as e:
logging.error(f"Failed to write the transformed transactions DataFrame: {e}")
class FactScheduledTransactions(Facts): class FactScheduledTransactions(Facts):
def __init__(self, config): def __init__(self, config):
super().__init__(config) super().__init__(config)
self.file_path = self.get_full_file_path('scheduled_transactions.parquet') self.file_path = self.get_full_file_path('scheduled_transactions.parquet')
@@ -73,30 +80,37 @@ class FactScheduledTransactions(Facts):
# Transform the DataFrame # Transform the DataFrame
logging.info("Transforming the scheduled transactions DataFrame") logging.info("Transforming the scheduled transactions DataFrame")
scheduled_transactions_df = ( try:
scheduled_transactions_df scheduled_transactions_df = (
.with_columns([ scheduled_transactions_df
pl.col("id").alias("scheduled_transaction_id"), .with_columns([
pl.col("date_first").alias("scheduled_transaction_first_date"), pl.col("id").alias("scheduled_transaction_id"),
pl.col("date_next").alias("scheduled_transaction_next_date"), pl.col("date_first").alias("scheduled_transaction_first_date"),
pl.col("frequency").alias("scheduled_transaction_frequency"), pl.col("date_next").alias("scheduled_transaction_next_date"),
pl.col("amount").alias("scheduled_transaction_amount"), pl.col("frequency").alias("scheduled_transaction_frequency"),
pl.col("memo").alias("scheduled_transaction_memo"), pl.col("amount").alias("scheduled_transaction_amount"),
pl.col("flag_color").alias("scheduled_transaction_flag_color"), pl.col("memo").alias("scheduled_transaction_memo"),
pl.col("account_id").alias("account_id"), pl.col("flag_color").alias("scheduled_transaction_flag_color"),
pl.col("payee_id").alias("payee_id"), pl.col("account_id").alias("account_id"),
pl.col("category_id").alias("category_id"), pl.col("payee_id").alias("payee_id"),
pl.col("transfer_account_id").alias("transfer_account_id"), pl.col("category_id").alias("category_id"),
]) pl.col("transfer_account_id").alias("transfer_account_id"),
.with_columns([ ])
pl.col("memo").fill_null("unknown"), .with_columns([
(pl.col("amount") / 100).alias("scheduled_transaction_amount"), pl.col("memo").fill_null("unknown"),
]) (pl.col("amount") / 100).alias("scheduled_transaction_amount"),
.drop([ ])
"subtransactions", "deleted","flag_name","account_name", .drop([
"payee_name","category_name","ingestion_date" "subtransactions", "deleted","flag_name","account_name",
]) "payee_name","category_name","ingestion_date"
) ])
)
except Exception as e:
logging.error(f"Failed to transform the scheduled transactions DataFrame: {e}")
return
# Write the DataFrame to a new parquet file # Write the DataFrame to a new parquet file
logging.info("Writing the transformed scheduled transactions DataFrame to parquet file") logging.info("Writing the transformed scheduled transactions DataFrame to parquet file")
scheduled_transactions_df.write_parquet(self.config['warehouse_data_path'] + '/scheduled_transactions.parquet') try:
scheduled_transactions_df.write_parquet(self.config['warehouse_data_path'] + '/scheduled_transactions.parquet')
except Exception as e:
logging.error(f"Failed to write the transformed scheduled transactions DataFrame: {e}")
+71 -24
View File
@@ -3,9 +3,14 @@ import time
import json import json
import logging import logging
import requests import requests
import sys
import yaml
from typing import Dict, Any from typing import Dict, Any
import config.exit_codes as ec
class Ingest: class Ingest:
def __init__(self, config: Dict[str, Any]): def __init__(self, config: Dict[str, Any]):
""" """
Initialize the Ingest class with the provided configuration. Initialize the Ingest class with the provided configuration.
@@ -18,6 +23,8 @@ class Ingest:
self.raw_data_path = config['raw_data_path'] self.raw_data_path = config['raw_data_path']
self.headers = {'Authorization': f'Bearer {self.api_token}'} self.headers = {'Authorization': f'Bearer {self.api_token}'}
self.knowledge_cache = self.load_knowledge_cache() self.knowledge_cache = self.load_knowledge_cache()
self.MAX_RETRIES = config['REQUESTS_MAX_RETRIES']
self.RETRY_DELAY = config['REQUESTS_RETRY_DELAY']
self.fetch_and_cache_entity_data() self.fetch_and_cache_entity_data()
def load_knowledge_cache(self) -> Dict[str, Any]: def load_knowledge_cache(self) -> Dict[str, Any]:
@@ -38,8 +45,13 @@ class Ingest:
if not os.path.exists(directory): if not os.path.exists(directory):
os.makedirs(directory) os.makedirs(directory)
entity_file = f'{directory}/{current_time}.json' entity_file = f'{directory}/{current_time}.json'
with open(entity_file, 'w') as f: logging.info(f"Saving {entity} data to {entity_file}")
json.dump(data, f, indent=4) try:
with open(entity_file, 'w') as f:
json.dump(data, f, indent=4)
except Exception as e:
logging.error(f"Error saving {entity} data: {e}")
def update_server_knowledge_cache(self, entity: str, server_knowledge: Any): def update_server_knowledge_cache(self, entity: str, server_knowledge: Any):
""" """
@@ -49,8 +61,7 @@ class Ingest:
with open(self.knowledge_file, 'r') as f: with open(self.knowledge_file, 'r') as f:
knowledge_cache = json.load(f) knowledge_cache = json.load(f)
except FileNotFoundError: except FileNotFoundError:
# If the file does not exist, create an empty cache logging.info(f"Knowledge file not found. Creating a new one at {self.knowledge_file}. This is normal for the first run.")
# also create the file so we can save to it later
os.makedirs(os.path.dirname(self.knowledge_file), exist_ok=True) os.makedirs(os.path.dirname(self.knowledge_file), exist_ok=True)
knowledge_cache = {} knowledge_cache = {}
@@ -71,41 +82,74 @@ class Ingest:
if remaining_requests < 20: if remaining_requests < 20:
logging.warning("Approaching rate limit. Consider pausing further requests.") logging.warning("Approaching rate limit. Consider pausing further requests.")
# Implement pause or delay logic here if necessary # Implement pause or delay logic here if necessary
if remaining_requests == 1:
logging.error("Rate limit exceeded. ending requests here and moving on with what we have.")
return True #returning True here to break out of any more ingestions
else: else:
logging.warning("X-Rate-Limit header is missing.") logging.warning("X-Rate-Limit header is missing.")
def handle_response(self, response) -> bool:
if response.status_code == 400:
logging.error("Bad request. The request could not be understood by the API due to malformed syntax or validation errors.")
sys.exit(ec.BAD_REQUEST)
elif response.status_code == 401:
logging.error("Unauthorized. Please check your API token.")
sys.exit(ec.UNAUTHORIZED_API_TOKEN)
elif response.status_code == 403:
logging.error("Forbidden. Access is denied.")
sys.exit(ec.FORBIDDEN)
elif response.status_code == 404:
logging.error("Not found. The specified URI does not exist.")
sys.exit(ec.NOT_FOUND)
elif response.status_code == 409:
logging.error("Conflict. The resource cannot be saved due to a conflict.")
sys.exit(ec.CONFLICT)
elif response.status_code == 429:
logging.error("Too many requests. You have made too many requests in a short amount of time.")
return True
elif response.status_code == 500:
logging.error("Internal server error. The API experienced an unexpected error.")
return True
elif response.status_code == 503:
logging.error("Service unavailable. The API is temporarily disabled or a request timeout occurred.")
return True
else:
response.raise_for_status()
return False
def fetch_and_cache_entity_data(self): def fetch_and_cache_entity_data(self):
""" """
Fetch and cache data for all entities. Fetch and cache data for all entities.
""" """
for entity in self.entities: for entity in self.entities:
# if we already have files in the raw data folder, we need to skip that entity
file_path = f'data/raw/{entity}' file_path = f'data/raw/{entity}'
if os.path.exists(file_path) and os.listdir(file_path): if os.path.exists(file_path) and os.listdir(file_path):
logging.warning(f"Skipping entity: {entity} as the raw data folder is not empty.") logging.warning(f"Raw data exists for {entity} processing any raw data we already have.")
continue break # break here instead of continue as we dont want to update our server knowledge cache and potentially miss data.
last_knowledge = self.knowledge_cache.get(entity, 0) last_knowledge = self.knowledge_cache.get(entity, 0)
logging.debug(f'Last Knowledge of {entity.capitalize()}: {last_knowledge}') #logging.debug(f'Last Knowledge of {entity}: {last_knowledge}')
url = f'{self.base_url}/{self.budget_id}/{entity}' logging.info(f'Fetching {entity} data since last knowledge: {last_knowledge}')
if last_knowledge: url = f'{self.base_url}/{self.budget_id}/{entity}?last_knowledge_of_server={last_knowledge}'
logging.info(f'Fetching {entity} data since last knowledge: {last_knowledge}')
url = url + f'?last_knowledge_of_server={last_knowledge}'
response = requests.get(url, headers=self.headers) for attempt in range(self.MAX_RETRIES):
if response.status_code == 401: try:
logging.error("Unauthorized. Please check your API token.") response = requests.get(url, headers=self.headers)
break should_retry = self.handle_response(response)
if not should_retry:
self.check_rate_limit(response) break # Exit the loop if the request is successful
except requests.exceptions.RequestException as e:
if response.status_code == 429: logging.error(f"Error fetching {entity} data (attempt {attempt + 1}/{self.MAX_RETRIES}): {e}")
logging.error("Rate limit exceeded. Pausing until the limit is reset.") if attempt < self.MAX_RETRIES - 1:
# Implement pause until the limit reset logic here time.sleep(self.RETRY_DELAY) # Wait before retrying
break else:
logging.error("Max retries reached. Exiting.")
sys.exit(ec.REQUESTS_ERROR)
data = response.json() data = response.json()
server_knowledge = data['data'].get('server_knowledge') server_knowledge = data['data'].get('server_knowledge')
logging.debug(f'{entity.capitalize()} Server Knowledge: {server_knowledge}') logging.debug(f'{entity} new server knowledge: {server_knowledge}')
if server_knowledge is not None and server_knowledge != last_knowledge: if server_knowledge is not None and server_knowledge != last_knowledge:
self.update_server_knowledge_cache(entity, server_knowledge) self.update_server_knowledge_cache(entity, server_knowledge)
@@ -114,3 +158,6 @@ class Ingest:
self.save_entity_data_to_raw(entity, entity_data) self.save_entity_data_to_raw(entity, entity_data)
else: else:
logging.info(f"No new data for {entity}. Skipping cache update.") logging.info(f"No new data for {entity}. Skipping cache update.")
if self.check_rate_limit(response):
break # break out here and continue processing the data we have.
+112 -63
View File
@@ -1,8 +1,10 @@
import os import os
import json import json
import logging import logging
import sys
from datetime import datetime from datetime import datetime
from typing import List, Dict, Any from typing import Dict, Any
import config.exit_codes as ec
import polars as pl import polars as pl
class RawToBase: class RawToBase:
@@ -14,7 +16,6 @@ class RawToBase:
self.base_data_path = config['base_data_path'] self.base_data_path = config['base_data_path']
self.data = {} self.data = {}
self.base_data = {} self.base_data = {}
logging.basicConfig(level=logging.DEBUG)
self.process_entities() self.process_entities()
def process_entities(self): def process_entities(self):
@@ -22,7 +23,6 @@ class RawToBase:
# check the file is in the raw data path, if not skip the entity # check the file is in the raw data path, if not skip the entity
folder_path = os.path.join(self.raw_data_path, entity) folder_path = os.path.join(self.raw_data_path, entity)
folder_contents = os.listdir(folder_path) folder_contents = os.listdir(folder_path)
# Check if the folder is empty
if not folder_contents: if not folder_contents:
logging.warning(f"The folder {folder_path} is empty skipping {entity}.") logging.warning(f"The folder {folder_path} is empty skipping {entity}.")
continue continue
@@ -31,61 +31,94 @@ class RawToBase:
continue continue
self._load_existing_base_data(entity) self._load_existing_base_data(entity)
self._combine_data(entity) self._combine_data(entity)
self._resolve_duplicates(entity) if not self._resolve_duplicates(entity):
self._save_base_data(entity) logging.error(f"entity: {entity} failed duplicate resolution.")
self._move_raw_to_processed(entity) sys.exit(ec.DUPLICATE_RESOLUTION_ERROR)
if not self._save_base_data(entity):
logging.error(f"Skipping processing for entity: {entity} due to failed saving base data.")
continue
if not self._move_raw_to_processed(entity):
logging.error(f"entity: {entity} has been processed, but we could not move the file out of the raw folder, please clear the raw folder for {entity}.")
sys.exit(ec.MOVE_FILE_ERROR)
def _load_raw_data(self, entity): def _load_raw_data(self, entity):
entity_path = os.path.join(self.raw_data_path, entity) entity_path = os.path.join(self.raw_data_path, entity)
self.data[entity] = [] self.data[entity] = []
logging.debug(f"Loading data for entity: {entity} from path: {entity_path}") logging.debug(f"Loading data for entity: {entity} from path: {entity_path}")
for file_name in os.listdir(entity_path): files = [f for f in os.listdir(entity_path) if f.endswith('.json')]
if file_name.endswith('.json'):
file_path = os.path.join(entity_path, file_name) if len(files) > 1:
logging.debug(f"Reading file: {file_path}") logging.error(f"""More than one file found in path: {entity_path}. Skipping processing for entity: {entity}.
try: recommended actions is to move the newest file(s) out, re-run main.py.
with open(file_path, 'r') as f: Then move the files back in one at a time oldest to newest and run again for each file""")
data = json.load(f) return False
# Check if the data is empty
if entity == "categories": if len(files) == 1:
# Check if any category group has categories file_name = files[0]
has_categories = any(group.get("categories") for group in data.get("category_groups", [])) file_path = os.path.join(entity_path, file_name)
if not has_categories: logging.debug(f"Reading file: {file_path}")
logging.warning(f"Received empty data for entity: {entity} in file: {file_path}, deleting file.") try:
os.remove(file_path) with open(file_path, 'r') as f:
return False data = json.load(f)
else: except Exception as e:
if not data.get(entity, []): logging.error(f"Failed to load data from file: {file_path}, error: {e}")
logging.warning(f"Received empty data for entity: {entity} in file: {file_path}, deleting file.") return False
# delete the file as it is empty
os.remove(file_path) if self._is_data_empty(entity, data, file_path):
return False return False
modified_data = []
if entity == 'categories': modified_data = self._add_ingestion_date(entity, data, file_name)
for group in data.get('category_groups', []):
for category in group.get('categories', []): self.data[entity].append(modified_data)
category['ingestion_date'] = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date() logging.debug(f"Successfully loaded data from file: {file_path}")
modified_data.append(category)
else:
for record in data.get(f'{entity}', []):
if isinstance(record, dict):
record['ingestion_date'] = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()
modified_data.append(record)
else:
modified_data.append({'record': record, 'ingestion_date': datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()})
self.data[entity].append(modified_data)
logging.debug(f"Successfully loaded data from file: {file_path}")
except Exception as e:
logging.error(f"Failed to load data from file: {file_path}, error: {e}")
exit(1)
return True return True
def _is_data_empty(self, entity, data, file_path):
logging.debug(f"Checking if data is empty for entity: {entity}")
if entity == "categories":
has_categories = any(group.get("categories") for group in data.get("category_groups", []))
if not has_categories:
logging.warning(f"Received empty data for entity: {entity} in file: {file_path}, deleting file.")
os.remove(file_path)
return True
else:
if not data.get(entity, []):
logging.warning(f"Received empty data for entity: {entity} in file: {file_path}, deleting file.")
os.remove(file_path)
return True
logging.debug(f"Data is not empty for entity: {entity}")
return False
def _add_ingestion_date(self, entity, data, file_name):
modified_data = []
ingestion_date = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()
logging.debug(f"Adding ingestion date to data for entity: {entity}")
if entity == 'categories':
for group in data.get('category_groups', []):
for category in group.get('categories', []):
category['ingestion_date'] = ingestion_date
modified_data.append(category)
else:
for record in data.get(f'{entity}', []):
if isinstance(record, dict):
record['ingestion_date'] = ingestion_date
modified_data.append(record)
else:
modified_data.append({'record': record, 'ingestion_date': ingestion_date})
logging.debug(f"Successfully added ingestion date to data for entity: {entity}")
return modified_data
def _load_existing_base_data(self, entity): def _load_existing_base_data(self, entity):
base_path = os.path.join(self.base_data_path, f'{entity}.parquet') base_path = os.path.join(self.base_data_path, f'{entity}.parquet')
if os.path.exists(base_path): if os.path.exists(base_path):
logging.debug(f"Loading existing base data for entity: {entity} from path: {base_path}") logging.debug(f"Loading existing base data for entity: {entity} from path: {base_path}")
self.base_data[entity] = pl.read_parquet(base_path) try:
self.base_data[entity] = pl.read_parquet(base_path)
except Exception as e:
logging.error(f"Failed to load existing base data for entity: {entity}, error: {e}, Creating an empty DataFrame")
self.base_data[entity] = pl.DataFrame()
logging.debug(f"Successfully loaded existing base data for entity: {entity}") logging.debug(f"Successfully loaded existing base data for entity: {entity}")
else: else:
self.base_data[entity] = pl.DataFrame() self.base_data[entity] = pl.DataFrame()
@@ -103,7 +136,6 @@ class RawToBase:
combined_data.extend(data) combined_data.extend(data)
new_data_df = pl.DataFrame(combined_data) new_data_df = pl.DataFrame(combined_data)
#print(new_data_df)
# Ensure the unique id column is preserved # Ensure the unique id column is preserved
unique_id = self.primary_keys[entity]['unique_id'] unique_id = self.primary_keys[entity]['unique_id']
@@ -117,35 +149,52 @@ class RawToBase:
def _resolve_duplicates(self, entity): def _resolve_duplicates(self, entity):
logging.debug(f"Resolving duplicates for entity: {entity}") logging.debug(f"Resolving duplicates for entity: {entity}")
unique_id = self.primary_keys[entity]['unique_id'] unique_id = self.primary_keys[entity]['unique_id']
self.base_data[entity] = self.base_data[entity].sort(by='ingestion_date').unique(subset=unique_id, keep='first') try:
self.base_data[entity] = self.base_data[entity].sort(by='ingestion_date').unique(subset=unique_id, keep='first')
except Exception as e:
logging.error(f"Failed to resolve duplicates for entity: {entity}, error: {e}")
return False
logging.debug(f"Successfully resolved duplicates for entity: {entity}") logging.debug(f"Successfully resolved duplicates for entity: {entity}")
return True
def _save_base_data(self, entity): def _save_base_data(self, entity):
os.makedirs(self.base_data_path, exist_ok=True) os.makedirs(self.base_data_path, exist_ok=True)
file_path = os.path.join(self.base_data_path, f'{entity}.parquet') file_path = os.path.join(self.base_data_path, f'{entity}.parquet')
self.base_data[entity].write_parquet(file_path) try:
self.base_data[entity].write_parquet(file_path)
except Exception as e:
logging.error(f"Failed to save base data for entity: {entity}, error: {e}")
return False
logging.debug(f"Saved base data for entity: {entity} to path: {file_path}") logging.debug(f"Saved base data for entity: {entity} to path: {file_path}")
return True
def _move_raw_to_processed(self, entity): def _move_raw_to_processed(self, entity):
raw_entity_path = os.path.join(self.raw_data_path, entity) raw_entity_path = os.path.join(self.raw_data_path, entity)
processed_path = os.path.join(self.processed_data_path, entity) processed_path = os.path.join(self.processed_data_path, entity)
# logging.debug(f"Raw entity path: {raw_entity_path}")
# logging.debug(f"Processed path: {processed_path}")
os.makedirs(processed_path, exist_ok=True) os.makedirs(processed_path, exist_ok=True)
for file_name in os.listdir(raw_entity_path): try:
if file_name.endswith('.json'): files = [f for f in os.listdir(raw_entity_path) if f.endswith('.json')]
raw_file_path = os.path.join(raw_entity_path, file_name) if len(files) != 1:
processed_file_path = os.path.join(processed_path, file_name) logging.error(f"Expected exactly one file in path: {raw_entity_path}, but found {len(files)}")
return False
logging.debug(f"Moving file: {raw_file_path} to {processed_file_path}") file_name = files[0]
raw_file_path = os.path.join(raw_entity_path, file_name)
processed_file_path = os.path.join(processed_path, file_name)
if os.path.exists(raw_file_path): logging.debug(f"Moving file: {raw_file_path} to {processed_file_path}")
os.rename(raw_file_path, processed_file_path)
logging.debug(f"Moved file: {file_name}")
else:
logging.error(f"File not found: {raw_file_path}")
logging.debug(f"Moved processed files for entity: {entity} to path: {processed_path}") os.rename(raw_file_path, processed_file_path)
logging.debug(f"Moved file: {file_name} to processed")
except FileNotFoundError as e:
logging.error(f"File not found: {e}")
return False
except Exception as e:
logging.error(f"Failed to move file for entity: {entity}, error: {e}")
return False
logging.debug(f"Moved processed file for entity: {entity} to path: {processed_path}")
return True