starting to build the warehouse

This commit is contained in:
Jake Pullen
2024-08-09 14:23:52 +01:00
parent 33c9496eb0
commit 165464820b
9 changed files with 67 additions and 7 deletions
View File
+60
View File
@@ -0,0 +1,60 @@
# This file is used to define the dimension table for the accounts table
# The accounts table contains information about the accounts in the budget
# The accounts table has the following columns:
# - id: the unique identifier for the account
# - name: the name of the account
# - type: the type of the account (e.g. checking, savings, credit card)
# - on_budget: a boolean indicating whether the account is on budget
# - closed: a boolean indicating whether the account is closed
# - note: a note associated with the account
# - balance: the current balance of the account
# - cleared_balance: the cleared balance of the account
# - uncleared_balance: the uncleared balance of the account
# - deleted: a boolean indicating whether the account has been deleted
# the below is mega tbc
import pandas as pd
from datetime import datetime
def handle_scd_type_2(dim_accounts_df, new_data_df):
current_date = datetime.now().date()
for index, new_row in new_data_df.iterrows():
account_id = new_row['account_id']
existing_rows = dim_accounts_df[dim_accounts_df['account_id'] == account_id]
if existing_rows.empty:
# Insert new record
new_row['start_date'] = current_date
new_row['end_date'] = None
new_row['is_current'] = True
dim_accounts_df = dim_accounts_df.append(new_row, ignore_index=True)
else:
current_row = existing_rows[existing_rows['is_current'] == True].iloc[0]
if not new_row.equals(current_row.drop(['surrogate_key', 'start_date', 'end_date', 'is_current'])):
# Update existing record to set is_current to False and end_date
dim_accounts_df.loc[current_row.name, 'is_current'] = False
dim_accounts_df.loc[current_row.name, 'end_date'] = current_date
# Insert new record
new_row['start_date'] = current_date
new_row['end_date'] = None
new_row['is_current'] = True
dim_accounts_df = dim_accounts_df.append(new_row, ignore_index=True)
return dim_accounts_df
# Example usage
dim_accounts_df = pd.DataFrame(columns=[
'surrogate_key', 'account_id', 'account_name', 'account_type', 'on_budget', 'closed', 'note',
'balance', 'cleared_balance', 'uncleared_balance', 'deleted', 'start_date', 'end_date', 'is_current'
])
new_data_df = pd.DataFrame([
{'account_id': 1, 'account_name': 'Checking Account', 'account_type': 'checking', 'on_budget': True, 'closed': False, 'note': '', 'balance': 1000.00, 'cleared_balance': 1000.00, 'uncleared_balance': 0.00, 'deleted': False},
{'account_id': 2, 'account_name': 'Savings Account', 'account_type': 'savings', 'on_budget': True, 'closed': False, 'note': '', 'balance': 5000.00, 'cleared_balance': 5000.00, 'uncleared_balance': 0.00, 'deleted': False}
])
dim_accounts_df = handle_scd_type_2(dim_accounts_df, new_data_df)
+115
View File
@@ -0,0 +1,115 @@
import os
import time
import json
import logging
import requests
from typing import Dict, Any
class Ingest:
def __init__(self, config: Dict[str, Any]):
"""
Initialize the Ingest class with the provided configuration.
"""
self.api_token = config['API_TOKEN']
self.budget_id = config['BUDGET_ID']
self.base_url = config['base_url']
self.knowledge_file = config['knowledge_file']
self.entities = config['entities']
self.headers = {'Authorization': f'Bearer {self.api_token}'}
self.knowledge_cache = self.load_knowledge_cache()
self.fetch_and_cache_entity_data()
def load_knowledge_cache(self) -> Dict[str, Any]:
"""
Load the knowledge cache from the file if it exists.
"""
if os.path.exists(self.knowledge_file):
with open(self.knowledge_file, 'r') as f:
return json.load(f)
return {}
def save_entity_data_to_raw(self, entity: str, data: Dict[str, Any]):
"""
Save the data for a specific entity to a new cache file.
"""
current_time = time.strftime('%Y%m%d%H%M%S')
directory = f'data/raw/{entity}'
if not os.path.exists(directory):
os.makedirs(directory)
entity_file = f'{directory}/{current_time}.json'
with open(entity_file, 'w') as f:
json.dump(data, f, indent=4)
def update_server_knowledge_cache(self, entity: str, server_knowledge: Any):
"""
Update the server knowledge cache for a specific entity.
"""
try:
with open(self.knowledge_file, 'r') as f:
knowledge_cache = json.load(f)
except FileNotFoundError:
# If the file does not exist, create an empty cache
# also create the file so we can save to it later
os.makedirs(os.path.dirname(self.knowledge_file), exist_ok=True)
knowledge_cache = {}
knowledge_cache[entity] = server_knowledge
with open(self.knowledge_file, 'w') as f:
json.dump(knowledge_cache, f, indent=4)
def check_rate_limit(self, response: requests.Response):
"""
Check and handle the rate limit based on the response headers.
"""
rate_limit_header = response.headers.get('X-Rate-Limit')
if rate_limit_header:
requests_made, limit = map(int, rate_limit_header.split('/'))
remaining_requests = limit - requests_made
logging.info(f"Rate Limit: {remaining_requests}/{limit} requests remaining.")
if remaining_requests < 20:
logging.warning("Approaching rate limit. Consider pausing further requests.")
# Implement pause or delay logic here if necessary
else:
logging.warning("X-Rate-Limit header is missing.")
def fetch_and_cache_entity_data(self):
"""
Fetch and cache data for all entities.
"""
for entity in self.entities:
# if we already have files in the raw data folder, we need to skip that entity
file_path = f'data/raw/{entity}'
if os.path.exists(file_path) and os.listdir(file_path):
logging.warning(f"Skipping entity: {entity} as the raw data folder is not empty.")
continue
last_knowledge = self.knowledge_cache.get(entity, 0)
logging.debug(f'Last Knowledge of {entity.capitalize()}: {last_knowledge}')
url = f'{self.base_url}/{self.budget_id}/{entity}'
if last_knowledge:
logging.info(f'Fetching {entity} data since last knowledge: {last_knowledge}')
url = url + f'?last_knowledge_of_server={last_knowledge}'
response = requests.get(url, headers=self.headers)
if response.status_code == 401:
logging.error("Unauthorized. Please check your API token.")
break
self.check_rate_limit(response)
if response.status_code == 429:
logging.error("Rate limit exceeded. Pausing until the limit is reset.")
# Implement pause until the limit reset logic here
break
data = response.json()
server_knowledge = data['data'].get('server_knowledge')
logging.debug(f'{entity.capitalize()} Server Knowledge: {server_knowledge}')
if server_knowledge is not None and server_knowledge != last_knowledge:
self.update_server_knowledge_cache(entity, server_knowledge)
entity_data = data['data']
entity_data.pop('server_knowledge', None)
self.save_entity_data_to_raw(entity, entity_data)
else:
logging.info(f"No new data for {entity}. Skipping cache update.")
+151
View File
@@ -0,0 +1,151 @@
import os
import json
import logging
from datetime import datetime
from typing import List, Dict, Any
import polars as pl
class RawToBase:
def __init__(self, config: Dict[str, Any]):
self.entities = config['entities']
self.primary_keys = config['primary_keys']
self.raw_data_path = 'data/raw'
self.processed_data_path = 'data/processed'
self.base_data_path = 'data/base'
self.data = {}
self.base_data = {}
logging.basicConfig(level=logging.DEBUG)
self.process_entities()
def process_entities(self):
for entity in self.entities:
# check the file is in the raw data path, if not skip the entity
folder_path = os.path.join(self.raw_data_path, entity)
folder_contents = os.listdir(folder_path)
# Check if the folder is empty
if not folder_contents:
logging.warning(f"The folder {folder_path} is empty skipping {entity}.")
continue
if not self._load_raw_data(entity):
logging.warning(f"Skipping processing for entity: {entity} due to empty data.")
continue
self._load_existing_base_data(entity)
self._combine_data(entity)
self._resolve_duplicates(entity)
self._save_base_data(entity)
self._move_raw_to_processed(entity)
def _load_raw_data(self, entity):
entity_path = os.path.join(self.raw_data_path, entity)
self.data[entity] = []
logging.debug(f"Loading data for entity: {entity} from path: {entity_path}")
for file_name in os.listdir(entity_path):
if file_name.endswith('.json'):
file_path = os.path.join(entity_path, file_name)
logging.debug(f"Reading file: {file_path}")
try:
with open(file_path, 'r') as f:
data = json.load(f)
# Check if the data is empty
if entity == "categories":
# Check if any category group has categories
has_categories = any(group.get("categories") for group in data.get("category_groups", []))
if not has_categories:
logging.warning(f"Received empty data for entity: {entity} in file: {file_path}, deleting file.")
os.remove(file_path)
return False
else:
if not data.get(entity, []):
logging.warning(f"Received empty data for entity: {entity} in file: {file_path}, deleting file.")
# delete the file as it is empty
os.remove(file_path)
return False
modified_data = []
if entity == 'categories':
for group in data.get('category_groups', []):
for category in group.get('categories', []):
category['ingestion_date'] = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()
modified_data.append(category)
else:
for record in data.get(f'{entity}', []):
if isinstance(record, dict):
record['ingestion_date'] = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()
modified_data.append(record)
else:
modified_data.append({'record': record, 'ingestion_date': datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()})
self.data[entity].append(modified_data)
logging.debug(f"Successfully loaded data from file: {file_path}")
except Exception as e:
logging.error(f"Failed to load data from file: {file_path}, error: {e}")
exit(1)
return True
def _load_existing_base_data(self, entity):
base_path = os.path.join(self.base_data_path, f'{entity}.parquet')
if os.path.exists(base_path):
logging.debug(f"Loading existing base data for entity: {entity} from path: {base_path}")
self.base_data[entity] = pl.read_parquet(base_path)
logging.debug(f"Successfully loaded existing base data for entity: {entity}")
else:
self.base_data[entity] = pl.DataFrame()
logging.debug(f"No existing base data found for entity: {entity}, starting with an empty DataFrame")
def _combine_data(self, entity):
logging.debug(f"Combining data for entity: {entity}")
combined_data = []
if entity == 'categories':
for data in self.data[entity]:
for category in data:
combined_data.append(category)
else:
for data in self.data[entity]:
combined_data.extend(data)
new_data_df = pl.DataFrame(combined_data)
#print(new_data_df)
# Ensure the unique id column is preserved
unique_id = self.primary_keys[entity]['unique_id']
if unique_id not in new_data_df.columns:
logging.error(f"Unique ID column '{unique_id}' not found in the combined data for entity: {entity}")
exit(1)
self.base_data[entity] = new_data_df
logging.debug(f"Successfully combined data for entity: {entity}")
def _resolve_duplicates(self, entity):
logging.debug(f"Resolving duplicates for entity: {entity}")
unique_id = self.primary_keys[entity]['unique_id']
self.base_data[entity] = self.base_data[entity].sort(by='ingestion_date').unique(subset=unique_id, keep='first')
logging.debug(f"Successfully resolved duplicates for entity: {entity}")
def _save_base_data(self, entity):
os.makedirs(self.base_data_path, exist_ok=True)
file_path = os.path.join(self.base_data_path, f'{entity}.parquet')
self.base_data[entity].write_parquet(file_path)
logging.debug(f"Saved base data for entity: {entity} to path: {file_path}")
def _move_raw_to_processed(self, entity):
raw_entity_path = os.path.join(self.raw_data_path, entity)
processed_path = os.path.join(self.processed_data_path, entity)
# logging.debug(f"Raw entity path: {raw_entity_path}")
# logging.debug(f"Processed path: {processed_path}")
os.makedirs(processed_path, exist_ok=True)
for file_name in os.listdir(raw_entity_path):
if file_name.endswith('.json'):
raw_file_path = os.path.join(raw_entity_path, file_name)
processed_file_path = os.path.join(processed_path, file_name)
logging.debug(f"Moving file: {raw_file_path} to {processed_file_path}")
if os.path.exists(raw_file_path):
os.rename(raw_file_path, processed_file_path)
logging.debug(f"Moved file: {file_name}")
else:
logging.error(f"File not found: {raw_file_path}")
logging.debug(f"Moved processed files for entity: {entity} to path: {processed_path}")