Merge pull request #2 from Jake-Pullen/Develop
1.1 Release, Better Error Handling and Better Logging
This commit is contained in:
@@ -7,3 +7,4 @@ data/*
|
||||
__pycache__/*
|
||||
*/__pycache__/*
|
||||
*.pbix
|
||||
/logs/*
|
||||
@@ -24,3 +24,5 @@ raw_data_path: data/raw
|
||||
processed_data_path: data/processed
|
||||
base_data_path: data/base
|
||||
warehouse_data_path: data/warehouse
|
||||
REQUESTS_MAX_RETRIES: 3
|
||||
REQUESTS_RETRY_DELAY: 5
|
||||
@@ -0,0 +1,41 @@
|
||||
import datetime as dt
|
||||
import json
|
||||
import logging
|
||||
from typing import override
|
||||
|
||||
class custom_json_logger(logging.Formatter):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
format_keys: dict[str,str] | None = None,
|
||||
):
|
||||
super().__init__()
|
||||
self.format_keys = format_keys if format_keys is not None else {}
|
||||
|
||||
@override
|
||||
def format(self, record: logging.LogRecord) -> str:
|
||||
record_dict = self._prepare_log_dict(record)
|
||||
return json.dumps(record_dict, default=str)
|
||||
|
||||
def _prepare_log_dict(self, record: logging.LogRecord) -> dict:
|
||||
always_fields = {
|
||||
"message" : record.getMessage(),
|
||||
"timestamp" : dt.datetime.fromtimestamp(
|
||||
record.created, tz=dt.timezone.utc
|
||||
).isoformat(),
|
||||
}
|
||||
if record.exc_info is not None:
|
||||
always_fields["exc_info"] = self.formatException(record.exc_info)
|
||||
|
||||
if record.stack_info is not None:
|
||||
always_fields["stack_info"] = self.formatStack(record.stack_info)
|
||||
|
||||
message = {
|
||||
key: msg_val
|
||||
if (msg_val := always_fields.pop(val, None)) is not None
|
||||
else getattr(record, val)
|
||||
for key, val in self.format_keys.items()
|
||||
}
|
||||
message.update(always_fields)
|
||||
return message
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
SUCCESS = 0
|
||||
MISSING_ENV_VARS = 1
|
||||
MISSING_CONFIG_FILE = 2
|
||||
CORRUPTED_CONFIG_FILE = 3
|
||||
UNAUTHORIZED_API_TOKEN = 4
|
||||
REQUESTS_ERROR = 5
|
||||
BAD_REQUEST = 6
|
||||
FORBIDDEN = 7
|
||||
NOT_FOUND = 8
|
||||
CONFLICT = 9
|
||||
MOVE_FILE_ERROR = 10
|
||||
DUPLICATE_RESOLUTION_ERROR = 11
|
||||
@@ -0,0 +1,41 @@
|
||||
version: 1
|
||||
disable_existing_loggers: False
|
||||
formatters:
|
||||
simple:
|
||||
format: "%(asctime)s - %(levelname)s - %(module)s - %(funcName)s - %(message)s"
|
||||
datefmt: "%Y-%m-%d %H:%M:%S%z"
|
||||
json:
|
||||
"()": config.custom_json_logger.custom_json_logger
|
||||
format_keys:
|
||||
level: levelname
|
||||
timestamp: timestamp
|
||||
logger: name
|
||||
module: module
|
||||
function: funcName
|
||||
line: lineno
|
||||
message: message
|
||||
thread_name: threadName
|
||||
handlers:
|
||||
stderr:
|
||||
class: logging.StreamHandler
|
||||
level: INFO
|
||||
formatter: simple
|
||||
stream: ext://sys.stdout
|
||||
file:
|
||||
class: logging.handlers.RotatingFileHandler
|
||||
level: DEBUG
|
||||
formatter: json
|
||||
filename: logs/dpfy_log.jsonl
|
||||
maxBytes: 10485760 # 10MB
|
||||
backupCount: 10
|
||||
queue_handler:
|
||||
class: logging.handlers.QueueHandler
|
||||
handlers:
|
||||
- stderr
|
||||
- file
|
||||
respect_handler_level: True
|
||||
loggers:
|
||||
root:
|
||||
level: DEBUG
|
||||
handlers:
|
||||
- queue_handler
|
||||
@@ -2,25 +2,61 @@ import os
|
||||
import dotenv
|
||||
import logging
|
||||
import yaml
|
||||
import sys
|
||||
import atexit
|
||||
import logging.config
|
||||
import logging.handlers
|
||||
|
||||
import config.exit_codes as ec
|
||||
from pipeline.ingest import Ingest
|
||||
from pipeline.raw_to_base import RawToBase
|
||||
from pipeline.dimensions import DimAccounts, DimCategories, DimPayees, DimDate
|
||||
from pipeline.facts import FactTransactions, FactScheduledTransactions
|
||||
|
||||
def set_up_logging():
|
||||
try:
|
||||
with open('config/logging_config.yaml', 'r') as f:
|
||||
log_config = yaml.safe_load(f)
|
||||
logging.config.dictConfig(log_config)
|
||||
except yaml.YAMLError as e:
|
||||
print(f"Error parsing logging configuration file: {e}")
|
||||
log_config = {} # Initialize log_config to an empty dictionary
|
||||
logging.basicConfig(level=logging.INFO) # Fallback to a basic configuration
|
||||
queue_handler = logging.getHandlerByName('queue_handler')
|
||||
if queue_handler is not None:
|
||||
queue_handler.listener.start()
|
||||
atexit.register(queue_handler.listener.stop)
|
||||
|
||||
logger = logging.getLogger("data_pipeline_for_ynab")
|
||||
os.makedirs('logs', exist_ok=True)
|
||||
set_up_logging()
|
||||
|
||||
# Load environment variables
|
||||
dotenv.load_dotenv()
|
||||
|
||||
API_TOKEN = os.getenv('API_TOKEN')
|
||||
BUDGET_ID = os.getenv('BUDGET_ID')
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
with open('config.yaml', 'r') as file:
|
||||
config = yaml.safe_load(file)
|
||||
def main():
|
||||
if not API_TOKEN or not BUDGET_ID:
|
||||
logging.error('API_TOKEN or BUDGET_ID is not set in .env file')
|
||||
sys.exit(ec.MISSING_ENV_VARS)
|
||||
|
||||
config['API_TOKEN'] = API_TOKEN
|
||||
config['BUDGET_ID'] = BUDGET_ID
|
||||
try:
|
||||
with open('config/config.yaml', 'r') as file:
|
||||
config = yaml.safe_load(file)
|
||||
except FileNotFoundError:
|
||||
logging.error('config.yaml file not found')
|
||||
sys.exit(ec.MISSING_CONFIG_FILE)
|
||||
except yaml.YAMLError as e:
|
||||
logging.error(f'Error loading config.yaml: {e}')
|
||||
sys.exit(ec.CORRUPTED_CONFIG_FILE)
|
||||
|
||||
config['API_TOKEN'] = API_TOKEN
|
||||
config['BUDGET_ID'] = BUDGET_ID
|
||||
|
||||
logging.info('Starting data pipeline')
|
||||
|
||||
if __name__ == '__main__':
|
||||
Ingest(config)
|
||||
RawToBase(config)
|
||||
DimAccounts(config)
|
||||
@@ -29,3 +65,17 @@ if __name__ == '__main__':
|
||||
DimDate(config)
|
||||
FactTransactions(config)
|
||||
FactScheduledTransactions(config)
|
||||
|
||||
logging.info('Data pipeline completed successfully')
|
||||
sys.exit(ec.SUCCESS)
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
main()
|
||||
except SystemExit as e:
|
||||
exit_code = e.code
|
||||
if exit_code == ec.SUCCESS:
|
||||
logging.info('Program exited successfully')
|
||||
else:
|
||||
logging.error(f'Program exited with code {exit_code}')
|
||||
raise
|
||||
|
||||
+133
-77
@@ -21,40 +21,51 @@ class DimAccounts(Dimensions):
|
||||
|
||||
def transform(self):
|
||||
# Read the parquet file into a polars DataFrame
|
||||
accounts_df = pl.read_parquet(self.file_path)
|
||||
try:
|
||||
accounts_df = pl.read_parquet(self.file_path)
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to read the base accounts parquet file: {e}")
|
||||
return
|
||||
|
||||
# Transform the DataFrame
|
||||
logging.info("Transforming the accounts DataFrame")
|
||||
accounts_df = (
|
||||
accounts_df
|
||||
.with_columns([
|
||||
pl.col("id").alias("account_id"),
|
||||
pl.col("name").alias("account_name"),
|
||||
pl.col("type").alias("account_type"),
|
||||
pl.col("on_budget").alias("on_budget"),
|
||||
pl.col("closed").alias("closed"),
|
||||
pl.col("note").alias("note"),
|
||||
pl.col("balance").alias("balance"),
|
||||
pl.col("cleared_balance").alias("cleared_balance"),
|
||||
pl.col("uncleared_balance").alias("uncleared_balance"),
|
||||
pl.col("deleted").alias("deleted"),
|
||||
])
|
||||
.with_columns([
|
||||
pl.col("note").fill_null("unknown"),
|
||||
(pl.col("balance") / 100).alias("balance"),
|
||||
(pl.col("cleared_balance") / 100).alias("cleared_balance"),
|
||||
(pl.col("uncleared_balance") / 100).alias("uncleared_balance"),
|
||||
])
|
||||
.drop([
|
||||
"transfer_payee_id", "direct_import_linked", "direct_import_in_error",
|
||||
"last_reconciled_at", "debt_original_balance", "debt_interest_rates",
|
||||
"debt_minimum_payments", "debt_escrow_amounts", "ingestion_date"
|
||||
])
|
||||
)
|
||||
try:
|
||||
accounts_df = (
|
||||
accounts_df
|
||||
.with_columns([
|
||||
pl.col("id").alias("account_id"),
|
||||
pl.col("name").alias("account_name"),
|
||||
pl.col("type").alias("account_type"),
|
||||
pl.col("on_budget").alias("on_budget"),
|
||||
pl.col("closed").alias("closed"),
|
||||
pl.col("note").alias("note"),
|
||||
pl.col("balance").alias("balance"),
|
||||
pl.col("cleared_balance").alias("cleared_balance"),
|
||||
pl.col("uncleared_balance").alias("uncleared_balance"),
|
||||
pl.col("deleted").alias("deleted"),
|
||||
])
|
||||
.with_columns([
|
||||
pl.col("note").fill_null("unknown"),
|
||||
(pl.col("balance") / 100).alias("balance"),
|
||||
(pl.col("cleared_balance") / 100).alias("cleared_balance"),
|
||||
(pl.col("uncleared_balance") / 100).alias("uncleared_balance"),
|
||||
])
|
||||
.drop([
|
||||
"transfer_payee_id", "direct_import_linked", "direct_import_in_error",
|
||||
"last_reconciled_at", "debt_original_balance", "debt_interest_rates",
|
||||
"debt_minimum_payments", "debt_escrow_amounts", "ingestion_date"
|
||||
])
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to transform the accounts DataFrame: {e}")
|
||||
return
|
||||
# Write the DataFrame to a new parquet file
|
||||
logging.info("Writing the transformed accounts DataFrame to parquet file")
|
||||
accounts_df.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet')
|
||||
|
||||
try:
|
||||
accounts_df.write_parquet(self.config['warehouse_data_path'] + '/accounts.parquet')
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to write the transformed accounts DataFrame to parquet file: {e}")
|
||||
return
|
||||
|
||||
class DimCategories(Dimensions):
|
||||
def __init__(self, config):
|
||||
@@ -64,35 +75,51 @@ class DimCategories(Dimensions):
|
||||
|
||||
def transform(self):
|
||||
# Read the parquet file into a polars DataFrame
|
||||
categories_df = pl.read_parquet(self.file_path)
|
||||
try:
|
||||
categories_df = pl.read_parquet(self.file_path)
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to read the base categories parquet file: {e}")
|
||||
return
|
||||
logging.info("Transforming the categories DataFrame")
|
||||
# Select the required columns
|
||||
categories_df = categories_df.select([
|
||||
'id',
|
||||
'name',
|
||||
'category_group_name',
|
||||
'hidden',
|
||||
'note',
|
||||
'budgeted',
|
||||
'activity',
|
||||
'balance',
|
||||
'deleted'
|
||||
])
|
||||
# Rename the columns
|
||||
categories_df = categories_df.with_columns(pl.col('id').alias('category_id'))
|
||||
categories_df = categories_df.with_columns(pl.col('name').alias('category_name'))
|
||||
try:
|
||||
categories_df = categories_df.select([
|
||||
'id',
|
||||
'name',
|
||||
'category_group_name',
|
||||
'hidden',
|
||||
'note',
|
||||
'budgeted',
|
||||
'activity',
|
||||
'balance',
|
||||
'deleted'
|
||||
])
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to select columns from the categories DataFrame: {e}")
|
||||
return
|
||||
|
||||
# Fill null values in the note column
|
||||
categories_df = categories_df.with_columns(pl.col('note').fill_null('unknown'))
|
||||
try:
|
||||
# Rename the columns
|
||||
categories_df = categories_df.with_columns(pl.col('id').alias('category_id'))
|
||||
categories_df = categories_df.with_columns(pl.col('name').alias('category_name'))
|
||||
|
||||
# Convert the balance, budgeted, and activity columns to decimal
|
||||
categories_df = categories_df.with_columns(pl.col('balance') / 100)
|
||||
categories_df = categories_df.with_columns(pl.col('budgeted') / 100)
|
||||
categories_df = categories_df.with_columns(pl.col('activity') / 100)
|
||||
# Fill null values in the note column
|
||||
categories_df = categories_df.with_columns(pl.col('note').fill_null('unknown'))
|
||||
|
||||
# Convert the balance, budgeted, and activity columns to decimal
|
||||
categories_df = categories_df.with_columns(pl.col('balance') / 100)
|
||||
categories_df = categories_df.with_columns(pl.col('budgeted') / 100)
|
||||
categories_df = categories_df.with_columns(pl.col('activity') / 100)
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to transform the categories DataFrame: {e}")
|
||||
return
|
||||
|
||||
# Write the DataFrame to a new parquet file
|
||||
logging.info("Writing the transformed categories DataFrame to parquet file")
|
||||
categories_df.write_parquet(self.config['warehouse_data_path'] + '/categories.parquet')
|
||||
try:
|
||||
categories_df.write_parquet(self.config['warehouse_data_path'] + '/categories.parquet')
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to write the transformed categories DataFrame to parquet file: {e}")
|
||||
return
|
||||
|
||||
class DimPayees(Dimensions):
|
||||
def __init__(self, config):
|
||||
@@ -102,22 +129,36 @@ class DimPayees(Dimensions):
|
||||
|
||||
def transform(self):
|
||||
# Read the parquet file into a polars DataFrame
|
||||
payees_df = pl.read_parquet(self.file_path)
|
||||
try:
|
||||
payees_df = pl.read_parquet(self.file_path)
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to read the base payees parquet file: {e}")
|
||||
return
|
||||
logging.info("Transforming the payees DataFrame")
|
||||
# Select the required columns
|
||||
payees_df = payees_df.select([
|
||||
'id',
|
||||
'name',
|
||||
'deleted'
|
||||
])
|
||||
# Rename the columns
|
||||
payees_df = payees_df.with_columns(pl.col('id').alias('payee_id'))
|
||||
payees_df = payees_df.with_columns(pl.col('name').alias('payee_name'))
|
||||
try:
|
||||
payees_df = payees_df.select([
|
||||
'id',
|
||||
'name',
|
||||
'deleted'
|
||||
])
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to select columns from the payees DataFrame: {e}")
|
||||
return
|
||||
try:
|
||||
# Rename the columns
|
||||
payees_df = payees_df.with_columns(pl.col('id').alias('payee_id'))
|
||||
payees_df = payees_df.with_columns(pl.col('name').alias('payee_name'))
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to rename columns in the payees DataFrame: {e}")
|
||||
return
|
||||
|
||||
# Write the DataFrame to a new parquet file
|
||||
logging.info("Writing the transformed payees DataFrame to parquet file")
|
||||
payees_df.write_parquet(self.config['warehouse_data_path'] + '/payees.parquet')
|
||||
|
||||
try:
|
||||
payees_df.write_parquet(self.config['warehouse_data_path'] + '/payees.parquet')
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to write the transformed payees DataFrame to parquet file: {e}")
|
||||
return
|
||||
|
||||
class DimDate(Dimensions):
|
||||
def __init__(self, config):
|
||||
@@ -126,20 +167,35 @@ class DimDate(Dimensions):
|
||||
|
||||
def transform(self):
|
||||
# Create a DataFrame with dates from 2020-01-01 to 2030-12-31
|
||||
dates_df = pl.DataFrame({'date':pl.date_range(date(2020, 1, 1), date(2030, 12, 31), "1d", eager=True)})
|
||||
|
||||
try:
|
||||
dates_df = pl.DataFrame({'date':pl.date_range(date(2020, 1, 1), date(2030, 12, 31), "1d", eager=True)})
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to create a DataFrame with dates: {e}")
|
||||
return
|
||||
# Extract year, month, day, and weekday from the date column
|
||||
dates_df = dates_df.with_columns([
|
||||
pl.col('date').dt.year().alias('year'),
|
||||
pl.col('date').dt.month().alias('month'),
|
||||
pl.col('date').dt.day().alias('day'),
|
||||
pl.col('date').dt.weekday().alias('weekday')
|
||||
])
|
||||
# Create a new column to indicate if the date is a weekday or weekend
|
||||
dates_df = dates_df.with_columns([
|
||||
(pl.col('weekday') < 5).alias('is_weekday') # True for weekdays (Monday to Friday), False for weekends (Saturday and Sunday)
|
||||
])
|
||||
try:
|
||||
dates_df = dates_df.with_columns([
|
||||
pl.col('date').dt.year().alias('year'),
|
||||
pl.col('date').dt.month().alias('month'),
|
||||
pl.col('date').dt.day().alias('day'),
|
||||
pl.col('date').dt.weekday().alias('weekday')
|
||||
])
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to extract year, month, day, and weekday from the date column: {e}")
|
||||
return
|
||||
try:
|
||||
# Create a new column to indicate if the date is a weekday or weekend
|
||||
dates_df = dates_df.with_columns([
|
||||
(pl.col('weekday') < 5).alias('is_weekday') # True for weekdays (Monday to Friday), False for weekends (Saturday and Sunday)
|
||||
])
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to create a new column to indicate if the date is a weekday or weekend: {e}")
|
||||
return
|
||||
# Write the DataFrame to a new parquet file
|
||||
logging.info("Writing the transformed dates DataFrame to parquet file")
|
||||
dates_df.write_parquet(self.config['warehouse_data_path'] + '/dates.parquet')
|
||||
try:
|
||||
dates_df.write_parquet(self.config['warehouse_data_path'] + '/dates.parquet')
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to write the transformed dates DataFrame to parquet file: {e}")
|
||||
return
|
||||
|
||||
|
||||
+71
-57
@@ -1,7 +1,6 @@
|
||||
import polars as pl
|
||||
import logging
|
||||
import os
|
||||
from datetime import date
|
||||
|
||||
class Facts:
|
||||
def __init__(self, config):
|
||||
@@ -13,7 +12,6 @@ class Facts:
|
||||
return f"{self.base_file_path}/{file_name}"
|
||||
|
||||
class FactTransactions(Facts):
|
||||
|
||||
def __init__(self, config):
|
||||
super().__init__(config)
|
||||
self.file_path = self.get_full_file_path('transactions.parquet')
|
||||
@@ -21,43 +19,52 @@ class FactTransactions(Facts):
|
||||
|
||||
def transform(self):
|
||||
# Read the parquet file into a polars DataFrame
|
||||
transactions_df = pl.read_parquet(self.file_path)
|
||||
try:
|
||||
transactions_df = pl.read_parquet(self.file_path)
|
||||
except FileNotFoundError:
|
||||
logging.error("The transactions DataFrame does not exist")
|
||||
return
|
||||
|
||||
# Transform the DataFrame
|
||||
logging.info("Transforming the transactions DataFrame")
|
||||
transactions_df = (
|
||||
transactions_df
|
||||
.with_columns([
|
||||
pl.col("id").alias("transaction_id"),
|
||||
pl.col("date").alias("transaction_date"),
|
||||
pl.col("amount").alias("transaction_amount"),
|
||||
pl.col("memo").alias("transaction_memo"),
|
||||
pl.col("cleared").alias("transaction_cleared"),
|
||||
pl.col("approved").alias("transaction_approved"),
|
||||
pl.col("flag_color").alias("transaction_flag_color"),
|
||||
pl.col("account_id").alias("account_id"),
|
||||
pl.col("payee_id").alias("payee_id"),
|
||||
pl.col("category_id").alias("category_id"),
|
||||
pl.col("transfer_account_id").alias("transfer_account_id"),
|
||||
])
|
||||
.with_columns([
|
||||
pl.col("memo").fill_null("unknown"),
|
||||
(pl.col("amount") / 100).alias("transaction_amount"),
|
||||
])
|
||||
.drop([
|
||||
"transfer_transaction_id", "matched_transaction_id", "import_id",
|
||||
"subtransactions", "deleted","flag_name","account_name",
|
||||
"payee_name","category_name","import_payee_name","import_payee_name_original",
|
||||
"debt_transaction_type","ingestion_date"
|
||||
])
|
||||
)
|
||||
|
||||
try:
|
||||
transactions_df = (
|
||||
transactions_df
|
||||
.with_columns([
|
||||
pl.col("id").alias("transaction_id"),
|
||||
pl.col("date").alias("transaction_date"),
|
||||
pl.col("amount").alias("transaction_amount"),
|
||||
pl.col("memo").alias("transaction_memo"),
|
||||
pl.col("cleared").alias("transaction_cleared"),
|
||||
pl.col("approved").alias("transaction_approved"),
|
||||
pl.col("flag_color").alias("transaction_flag_color"),
|
||||
pl.col("account_id").alias("account_id"),
|
||||
pl.col("payee_id").alias("payee_id"),
|
||||
pl.col("category_id").alias("category_id"),
|
||||
pl.col("transfer_account_id").alias("transfer_account_id"),
|
||||
])
|
||||
.with_columns([
|
||||
pl.col("memo").fill_null("unknown"),
|
||||
(pl.col("amount") / 100).alias("transaction_amount"),
|
||||
])
|
||||
.drop([
|
||||
"transfer_transaction_id", "matched_transaction_id", "import_id",
|
||||
"subtransactions", "deleted","flag_name","account_name",
|
||||
"payee_name","category_name","import_payee_name","import_payee_name_original",
|
||||
"debt_transaction_type","ingestion_date"
|
||||
])
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to transform the transactions DataFrame: {e}")
|
||||
return
|
||||
# Write the DataFrame to a new parquet file
|
||||
logging.info("Writing the transformed transactions DataFrame to parquet file")
|
||||
transactions_df.write_parquet(self.config['warehouse_data_path'] + '/transactions.parquet')
|
||||
try:
|
||||
transactions_df.write_parquet(self.config['warehouse_data_path'] + '/transactions.parquet')
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to write the transformed transactions DataFrame: {e}")
|
||||
|
||||
class FactScheduledTransactions(Facts):
|
||||
|
||||
def __init__(self, config):
|
||||
super().__init__(config)
|
||||
self.file_path = self.get_full_file_path('scheduled_transactions.parquet')
|
||||
@@ -73,30 +80,37 @@ class FactScheduledTransactions(Facts):
|
||||
|
||||
# Transform the DataFrame
|
||||
logging.info("Transforming the scheduled transactions DataFrame")
|
||||
scheduled_transactions_df = (
|
||||
scheduled_transactions_df
|
||||
.with_columns([
|
||||
pl.col("id").alias("scheduled_transaction_id"),
|
||||
pl.col("date_first").alias("scheduled_transaction_first_date"),
|
||||
pl.col("date_next").alias("scheduled_transaction_next_date"),
|
||||
pl.col("frequency").alias("scheduled_transaction_frequency"),
|
||||
pl.col("amount").alias("scheduled_transaction_amount"),
|
||||
pl.col("memo").alias("scheduled_transaction_memo"),
|
||||
pl.col("flag_color").alias("scheduled_transaction_flag_color"),
|
||||
pl.col("account_id").alias("account_id"),
|
||||
pl.col("payee_id").alias("payee_id"),
|
||||
pl.col("category_id").alias("category_id"),
|
||||
pl.col("transfer_account_id").alias("transfer_account_id"),
|
||||
])
|
||||
.with_columns([
|
||||
pl.col("memo").fill_null("unknown"),
|
||||
(pl.col("amount") / 100).alias("scheduled_transaction_amount"),
|
||||
])
|
||||
.drop([
|
||||
"subtransactions", "deleted","flag_name","account_name",
|
||||
"payee_name","category_name","ingestion_date"
|
||||
])
|
||||
)
|
||||
try:
|
||||
scheduled_transactions_df = (
|
||||
scheduled_transactions_df
|
||||
.with_columns([
|
||||
pl.col("id").alias("scheduled_transaction_id"),
|
||||
pl.col("date_first").alias("scheduled_transaction_first_date"),
|
||||
pl.col("date_next").alias("scheduled_transaction_next_date"),
|
||||
pl.col("frequency").alias("scheduled_transaction_frequency"),
|
||||
pl.col("amount").alias("scheduled_transaction_amount"),
|
||||
pl.col("memo").alias("scheduled_transaction_memo"),
|
||||
pl.col("flag_color").alias("scheduled_transaction_flag_color"),
|
||||
pl.col("account_id").alias("account_id"),
|
||||
pl.col("payee_id").alias("payee_id"),
|
||||
pl.col("category_id").alias("category_id"),
|
||||
pl.col("transfer_account_id").alias("transfer_account_id"),
|
||||
])
|
||||
.with_columns([
|
||||
pl.col("memo").fill_null("unknown"),
|
||||
(pl.col("amount") / 100).alias("scheduled_transaction_amount"),
|
||||
])
|
||||
.drop([
|
||||
"subtransactions", "deleted","flag_name","account_name",
|
||||
"payee_name","category_name","ingestion_date"
|
||||
])
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to transform the scheduled transactions DataFrame: {e}")
|
||||
return
|
||||
# Write the DataFrame to a new parquet file
|
||||
logging.info("Writing the transformed scheduled transactions DataFrame to parquet file")
|
||||
scheduled_transactions_df.write_parquet(self.config['warehouse_data_path'] + '/scheduled_transactions.parquet')
|
||||
try:
|
||||
scheduled_transactions_df.write_parquet(self.config['warehouse_data_path'] + '/scheduled_transactions.parquet')
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to write the transformed scheduled transactions DataFrame: {e}")
|
||||
|
||||
+71
-24
@@ -3,9 +3,14 @@ import time
|
||||
import json
|
||||
import logging
|
||||
import requests
|
||||
import sys
|
||||
import yaml
|
||||
from typing import Dict, Any
|
||||
import config.exit_codes as ec
|
||||
|
||||
class Ingest:
|
||||
|
||||
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
"""
|
||||
Initialize the Ingest class with the provided configuration.
|
||||
@@ -18,6 +23,8 @@ class Ingest:
|
||||
self.raw_data_path = config['raw_data_path']
|
||||
self.headers = {'Authorization': f'Bearer {self.api_token}'}
|
||||
self.knowledge_cache = self.load_knowledge_cache()
|
||||
self.MAX_RETRIES = config['REQUESTS_MAX_RETRIES']
|
||||
self.RETRY_DELAY = config['REQUESTS_RETRY_DELAY']
|
||||
self.fetch_and_cache_entity_data()
|
||||
|
||||
def load_knowledge_cache(self) -> Dict[str, Any]:
|
||||
@@ -38,8 +45,13 @@ class Ingest:
|
||||
if not os.path.exists(directory):
|
||||
os.makedirs(directory)
|
||||
entity_file = f'{directory}/{current_time}.json'
|
||||
with open(entity_file, 'w') as f:
|
||||
json.dump(data, f, indent=4)
|
||||
logging.info(f"Saving {entity} data to {entity_file}")
|
||||
try:
|
||||
with open(entity_file, 'w') as f:
|
||||
json.dump(data, f, indent=4)
|
||||
except Exception as e:
|
||||
logging.error(f"Error saving {entity} data: {e}")
|
||||
|
||||
|
||||
def update_server_knowledge_cache(self, entity: str, server_knowledge: Any):
|
||||
"""
|
||||
@@ -49,8 +61,7 @@ class Ingest:
|
||||
with open(self.knowledge_file, 'r') as f:
|
||||
knowledge_cache = json.load(f)
|
||||
except FileNotFoundError:
|
||||
# If the file does not exist, create an empty cache
|
||||
# also create the file so we can save to it later
|
||||
logging.info(f"Knowledge file not found. Creating a new one at {self.knowledge_file}. This is normal for the first run.")
|
||||
os.makedirs(os.path.dirname(self.knowledge_file), exist_ok=True)
|
||||
knowledge_cache = {}
|
||||
|
||||
@@ -71,41 +82,74 @@ class Ingest:
|
||||
if remaining_requests < 20:
|
||||
logging.warning("Approaching rate limit. Consider pausing further requests.")
|
||||
# Implement pause or delay logic here if necessary
|
||||
if remaining_requests == 1:
|
||||
logging.error("Rate limit exceeded. ending requests here and moving on with what we have.")
|
||||
return True #returning True here to break out of any more ingestions
|
||||
|
||||
else:
|
||||
logging.warning("X-Rate-Limit header is missing.")
|
||||
|
||||
def handle_response(self, response) -> bool:
|
||||
if response.status_code == 400:
|
||||
logging.error("Bad request. The request could not be understood by the API due to malformed syntax or validation errors.")
|
||||
sys.exit(ec.BAD_REQUEST)
|
||||
elif response.status_code == 401:
|
||||
logging.error("Unauthorized. Please check your API token.")
|
||||
sys.exit(ec.UNAUTHORIZED_API_TOKEN)
|
||||
elif response.status_code == 403:
|
||||
logging.error("Forbidden. Access is denied.")
|
||||
sys.exit(ec.FORBIDDEN)
|
||||
elif response.status_code == 404:
|
||||
logging.error("Not found. The specified URI does not exist.")
|
||||
sys.exit(ec.NOT_FOUND)
|
||||
elif response.status_code == 409:
|
||||
logging.error("Conflict. The resource cannot be saved due to a conflict.")
|
||||
sys.exit(ec.CONFLICT)
|
||||
elif response.status_code == 429:
|
||||
logging.error("Too many requests. You have made too many requests in a short amount of time.")
|
||||
return True
|
||||
elif response.status_code == 500:
|
||||
logging.error("Internal server error. The API experienced an unexpected error.")
|
||||
return True
|
||||
elif response.status_code == 503:
|
||||
logging.error("Service unavailable. The API is temporarily disabled or a request timeout occurred.")
|
||||
return True
|
||||
else:
|
||||
response.raise_for_status()
|
||||
return False
|
||||
|
||||
def fetch_and_cache_entity_data(self):
|
||||
"""
|
||||
Fetch and cache data for all entities.
|
||||
"""
|
||||
for entity in self.entities:
|
||||
# if we already have files in the raw data folder, we need to skip that entity
|
||||
file_path = f'data/raw/{entity}'
|
||||
if os.path.exists(file_path) and os.listdir(file_path):
|
||||
logging.warning(f"Skipping entity: {entity} as the raw data folder is not empty.")
|
||||
continue
|
||||
logging.warning(f"Raw data exists for {entity} processing any raw data we already have.")
|
||||
break # break here instead of continue as we dont want to update our server knowledge cache and potentially miss data.
|
||||
|
||||
last_knowledge = self.knowledge_cache.get(entity, 0)
|
||||
logging.debug(f'Last Knowledge of {entity.capitalize()}: {last_knowledge}')
|
||||
url = f'{self.base_url}/{self.budget_id}/{entity}'
|
||||
if last_knowledge:
|
||||
logging.info(f'Fetching {entity} data since last knowledge: {last_knowledge}')
|
||||
url = url + f'?last_knowledge_of_server={last_knowledge}'
|
||||
#logging.debug(f'Last Knowledge of {entity}: {last_knowledge}')
|
||||
logging.info(f'Fetching {entity} data since last knowledge: {last_knowledge}')
|
||||
url = f'{self.base_url}/{self.budget_id}/{entity}?last_knowledge_of_server={last_knowledge}'
|
||||
|
||||
response = requests.get(url, headers=self.headers)
|
||||
if response.status_code == 401:
|
||||
logging.error("Unauthorized. Please check your API token.")
|
||||
break
|
||||
|
||||
self.check_rate_limit(response)
|
||||
|
||||
if response.status_code == 429:
|
||||
logging.error("Rate limit exceeded. Pausing until the limit is reset.")
|
||||
# Implement pause until the limit reset logic here
|
||||
break
|
||||
for attempt in range(self.MAX_RETRIES):
|
||||
try:
|
||||
response = requests.get(url, headers=self.headers)
|
||||
should_retry = self.handle_response(response)
|
||||
if not should_retry:
|
||||
break # Exit the loop if the request is successful
|
||||
except requests.exceptions.RequestException as e:
|
||||
logging.error(f"Error fetching {entity} data (attempt {attempt + 1}/{self.MAX_RETRIES}): {e}")
|
||||
if attempt < self.MAX_RETRIES - 1:
|
||||
time.sleep(self.RETRY_DELAY) # Wait before retrying
|
||||
else:
|
||||
logging.error("Max retries reached. Exiting.")
|
||||
sys.exit(ec.REQUESTS_ERROR)
|
||||
|
||||
data = response.json()
|
||||
server_knowledge = data['data'].get('server_knowledge')
|
||||
logging.debug(f'{entity.capitalize()} Server Knowledge: {server_knowledge}')
|
||||
logging.debug(f'{entity} new server knowledge: {server_knowledge}')
|
||||
|
||||
if server_knowledge is not None and server_knowledge != last_knowledge:
|
||||
self.update_server_knowledge_cache(entity, server_knowledge)
|
||||
@@ -114,3 +158,6 @@ class Ingest:
|
||||
self.save_entity_data_to_raw(entity, entity_data)
|
||||
else:
|
||||
logging.info(f"No new data for {entity}. Skipping cache update.")
|
||||
|
||||
if self.check_rate_limit(response):
|
||||
break # break out here and continue processing the data we have.
|
||||
|
||||
+112
-63
@@ -1,8 +1,10 @@
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from typing import List, Dict, Any
|
||||
from typing import Dict, Any
|
||||
import config.exit_codes as ec
|
||||
import polars as pl
|
||||
|
||||
class RawToBase:
|
||||
@@ -14,7 +16,6 @@ class RawToBase:
|
||||
self.base_data_path = config['base_data_path']
|
||||
self.data = {}
|
||||
self.base_data = {}
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
self.process_entities()
|
||||
|
||||
def process_entities(self):
|
||||
@@ -22,7 +23,6 @@ class RawToBase:
|
||||
# check the file is in the raw data path, if not skip the entity
|
||||
folder_path = os.path.join(self.raw_data_path, entity)
|
||||
folder_contents = os.listdir(folder_path)
|
||||
# Check if the folder is empty
|
||||
if not folder_contents:
|
||||
logging.warning(f"The folder {folder_path} is empty skipping {entity}.")
|
||||
continue
|
||||
@@ -31,61 +31,94 @@ class RawToBase:
|
||||
continue
|
||||
self._load_existing_base_data(entity)
|
||||
self._combine_data(entity)
|
||||
self._resolve_duplicates(entity)
|
||||
self._save_base_data(entity)
|
||||
self._move_raw_to_processed(entity)
|
||||
if not self._resolve_duplicates(entity):
|
||||
logging.error(f"entity: {entity} failed duplicate resolution.")
|
||||
sys.exit(ec.DUPLICATE_RESOLUTION_ERROR)
|
||||
if not self._save_base_data(entity):
|
||||
logging.error(f"Skipping processing for entity: {entity} due to failed saving base data.")
|
||||
continue
|
||||
if not self._move_raw_to_processed(entity):
|
||||
logging.error(f"entity: {entity} has been processed, but we could not move the file out of the raw folder, please clear the raw folder for {entity}.")
|
||||
sys.exit(ec.MOVE_FILE_ERROR)
|
||||
|
||||
def _load_raw_data(self, entity):
|
||||
entity_path = os.path.join(self.raw_data_path, entity)
|
||||
self.data[entity] = []
|
||||
logging.debug(f"Loading data for entity: {entity} from path: {entity_path}")
|
||||
|
||||
for file_name in os.listdir(entity_path):
|
||||
if file_name.endswith('.json'):
|
||||
file_path = os.path.join(entity_path, file_name)
|
||||
logging.debug(f"Reading file: {file_path}")
|
||||
try:
|
||||
with open(file_path, 'r') as f:
|
||||
data = json.load(f)
|
||||
# Check if the data is empty
|
||||
if entity == "categories":
|
||||
# Check if any category group has categories
|
||||
has_categories = any(group.get("categories") for group in data.get("category_groups", []))
|
||||
if not has_categories:
|
||||
logging.warning(f"Received empty data for entity: {entity} in file: {file_path}, deleting file.")
|
||||
os.remove(file_path)
|
||||
return False
|
||||
else:
|
||||
if not data.get(entity, []):
|
||||
logging.warning(f"Received empty data for entity: {entity} in file: {file_path}, deleting file.")
|
||||
# delete the file as it is empty
|
||||
os.remove(file_path)
|
||||
return False
|
||||
modified_data = []
|
||||
if entity == 'categories':
|
||||
for group in data.get('category_groups', []):
|
||||
for category in group.get('categories', []):
|
||||
category['ingestion_date'] = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()
|
||||
modified_data.append(category)
|
||||
else:
|
||||
for record in data.get(f'{entity}', []):
|
||||
if isinstance(record, dict):
|
||||
record['ingestion_date'] = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()
|
||||
modified_data.append(record)
|
||||
else:
|
||||
modified_data.append({'record': record, 'ingestion_date': datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()})
|
||||
self.data[entity].append(modified_data)
|
||||
logging.debug(f"Successfully loaded data from file: {file_path}")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to load data from file: {file_path}, error: {e}")
|
||||
exit(1)
|
||||
files = [f for f in os.listdir(entity_path) if f.endswith('.json')]
|
||||
|
||||
if len(files) > 1:
|
||||
logging.error(f"""More than one file found in path: {entity_path}. Skipping processing for entity: {entity}.
|
||||
recommended actions is to move the newest file(s) out, re-run main.py.
|
||||
Then move the files back in one at a time oldest to newest and run again for each file""")
|
||||
return False
|
||||
|
||||
if len(files) == 1:
|
||||
file_name = files[0]
|
||||
file_path = os.path.join(entity_path, file_name)
|
||||
logging.debug(f"Reading file: {file_path}")
|
||||
try:
|
||||
with open(file_path, 'r') as f:
|
||||
data = json.load(f)
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to load data from file: {file_path}, error: {e}")
|
||||
return False
|
||||
|
||||
if self._is_data_empty(entity, data, file_path):
|
||||
return False
|
||||
|
||||
modified_data = self._add_ingestion_date(entity, data, file_name)
|
||||
|
||||
self.data[entity].append(modified_data)
|
||||
logging.debug(f"Successfully loaded data from file: {file_path}")
|
||||
return True
|
||||
|
||||
def _is_data_empty(self, entity, data, file_path):
|
||||
logging.debug(f"Checking if data is empty for entity: {entity}")
|
||||
if entity == "categories":
|
||||
has_categories = any(group.get("categories") for group in data.get("category_groups", []))
|
||||
if not has_categories:
|
||||
logging.warning(f"Received empty data for entity: {entity} in file: {file_path}, deleting file.")
|
||||
os.remove(file_path)
|
||||
return True
|
||||
else:
|
||||
if not data.get(entity, []):
|
||||
logging.warning(f"Received empty data for entity: {entity} in file: {file_path}, deleting file.")
|
||||
os.remove(file_path)
|
||||
return True
|
||||
logging.debug(f"Data is not empty for entity: {entity}")
|
||||
return False
|
||||
|
||||
def _add_ingestion_date(self, entity, data, file_name):
|
||||
modified_data = []
|
||||
ingestion_date = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()
|
||||
|
||||
logging.debug(f"Adding ingestion date to data for entity: {entity}")
|
||||
if entity == 'categories':
|
||||
for group in data.get('category_groups', []):
|
||||
for category in group.get('categories', []):
|
||||
category['ingestion_date'] = ingestion_date
|
||||
modified_data.append(category)
|
||||
else:
|
||||
for record in data.get(f'{entity}', []):
|
||||
if isinstance(record, dict):
|
||||
record['ingestion_date'] = ingestion_date
|
||||
modified_data.append(record)
|
||||
else:
|
||||
modified_data.append({'record': record, 'ingestion_date': ingestion_date})
|
||||
logging.debug(f"Successfully added ingestion date to data for entity: {entity}")
|
||||
return modified_data
|
||||
|
||||
def _load_existing_base_data(self, entity):
|
||||
base_path = os.path.join(self.base_data_path, f'{entity}.parquet')
|
||||
if os.path.exists(base_path):
|
||||
logging.debug(f"Loading existing base data for entity: {entity} from path: {base_path}")
|
||||
self.base_data[entity] = pl.read_parquet(base_path)
|
||||
try:
|
||||
self.base_data[entity] = pl.read_parquet(base_path)
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to load existing base data for entity: {entity}, error: {e}, Creating an empty DataFrame")
|
||||
self.base_data[entity] = pl.DataFrame()
|
||||
logging.debug(f"Successfully loaded existing base data for entity: {entity}")
|
||||
else:
|
||||
self.base_data[entity] = pl.DataFrame()
|
||||
@@ -103,7 +136,6 @@ class RawToBase:
|
||||
combined_data.extend(data)
|
||||
|
||||
new_data_df = pl.DataFrame(combined_data)
|
||||
#print(new_data_df)
|
||||
|
||||
# Ensure the unique id column is preserved
|
||||
unique_id = self.primary_keys[entity]['unique_id']
|
||||
@@ -117,35 +149,52 @@ class RawToBase:
|
||||
def _resolve_duplicates(self, entity):
|
||||
logging.debug(f"Resolving duplicates for entity: {entity}")
|
||||
unique_id = self.primary_keys[entity]['unique_id']
|
||||
self.base_data[entity] = self.base_data[entity].sort(by='ingestion_date').unique(subset=unique_id, keep='first')
|
||||
try:
|
||||
self.base_data[entity] = self.base_data[entity].sort(by='ingestion_date').unique(subset=unique_id, keep='first')
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to resolve duplicates for entity: {entity}, error: {e}")
|
||||
return False
|
||||
logging.debug(f"Successfully resolved duplicates for entity: {entity}")
|
||||
return True
|
||||
|
||||
def _save_base_data(self, entity):
|
||||
os.makedirs(self.base_data_path, exist_ok=True)
|
||||
file_path = os.path.join(self.base_data_path, f'{entity}.parquet')
|
||||
self.base_data[entity].write_parquet(file_path)
|
||||
try:
|
||||
self.base_data[entity].write_parquet(file_path)
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to save base data for entity: {entity}, error: {e}")
|
||||
return False
|
||||
logging.debug(f"Saved base data for entity: {entity} to path: {file_path}")
|
||||
return True
|
||||
|
||||
def _move_raw_to_processed(self, entity):
|
||||
raw_entity_path = os.path.join(self.raw_data_path, entity)
|
||||
processed_path = os.path.join(self.processed_data_path, entity)
|
||||
|
||||
# logging.debug(f"Raw entity path: {raw_entity_path}")
|
||||
# logging.debug(f"Processed path: {processed_path}")
|
||||
|
||||
os.makedirs(processed_path, exist_ok=True)
|
||||
|
||||
for file_name in os.listdir(raw_entity_path):
|
||||
if file_name.endswith('.json'):
|
||||
raw_file_path = os.path.join(raw_entity_path, file_name)
|
||||
processed_file_path = os.path.join(processed_path, file_name)
|
||||
try:
|
||||
files = [f for f in os.listdir(raw_entity_path) if f.endswith('.json')]
|
||||
if len(files) != 1:
|
||||
logging.error(f"Expected exactly one file in path: {raw_entity_path}, but found {len(files)}")
|
||||
return False
|
||||
|
||||
logging.debug(f"Moving file: {raw_file_path} to {processed_file_path}")
|
||||
file_name = files[0]
|
||||
raw_file_path = os.path.join(raw_entity_path, file_name)
|
||||
processed_file_path = os.path.join(processed_path, file_name)
|
||||
|
||||
if os.path.exists(raw_file_path):
|
||||
os.rename(raw_file_path, processed_file_path)
|
||||
logging.debug(f"Moved file: {file_name}")
|
||||
else:
|
||||
logging.error(f"File not found: {raw_file_path}")
|
||||
logging.debug(f"Moving file: {raw_file_path} to {processed_file_path}")
|
||||
|
||||
logging.debug(f"Moved processed files for entity: {entity} to path: {processed_path}")
|
||||
os.rename(raw_file_path, processed_file_path)
|
||||
logging.debug(f"Moved file: {file_name} to processed")
|
||||
|
||||
except FileNotFoundError as e:
|
||||
logging.error(f"File not found: {e}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to move file for entity: {entity}, error: {e}")
|
||||
return False
|
||||
|
||||
logging.debug(f"Moved processed file for entity: {entity} to path: {processed_path}")
|
||||
return True
|
||||
Reference in New Issue
Block a user