start of raw_to_base

This commit is contained in:
Jake Pullen
2024-07-29 07:40:45 +01:00
parent 31b82dc1d0
commit 23458af133
4 changed files with 126 additions and 43 deletions
Binary file not shown.
+42 -30
View File
@@ -3,35 +3,47 @@ import time
import json import json
import logging import logging
import requests import requests
from typing import Dict, Any
class Ingest:
class injest: def __init__(self, ingest_info: Dict[str, Any]):
def __init__(self, injest_info): """
self.API_TOKEN = injest_info['API_TOKEN'], Initialize the Ingest class with the provided configuration.
self.BUDGET_ID = injest_info['BUDGET_ID'], """
self.headers = {'Authorization': f'Bearer {self.API_TOKEN}'}, self.api_token = ingest_info['API_TOKEN']
self.entities = ['accounts', 'categories', 'months', 'payees', 'transactions', 'scheduled_transactions'], self.budget_id = ingest_info['BUDGET_ID']
self.base_url = injest_info['base_url'], self.base_url = ingest_info['base_url']
self.knowledge_file = injest_info['knowledge_file'] self.knowledge_file = ingest_info['knowledge_file']
self.entities = ingest_info['entities']
self.headers = {'Authorization': f'Bearer {self.api_token}'}
self.knowledge_cache = self.load_knowledge_cache() self.knowledge_cache = self.load_knowledge_cache()
self.fetch_and_cache_entity_data() self.fetch_and_cache_entity_data()
def load_knowledge_cache(self): def load_knowledge_cache(self) -> Dict[str, Any]:
"""
Load the knowledge cache from the file if it exists.
"""
if os.path.exists(self.knowledge_file): if os.path.exists(self.knowledge_file):
with open(self.knowledge_file, 'r') as f: with open(self.knowledge_file, 'r') as f:
return json.load(f) return json.load(f)
return {} return {}
def update_entity_data_cache(self,entity, data): def save_entity_data_to_raw(self, entity: str, data: Dict[str, Any]):
"""
Save the data for a specific entity to a new cache file.
"""
current_time = time.strftime('%Y%m%d%H%M%S') current_time = time.strftime('%Y%m%d%H%M%S')
directory = f'data/{entity}' # Directory name is the entity's name directory = f'data/{entity}'
if not os.path.exists(directory): if not os.path.exists(directory):
os.makedirs(directory) # Create the directory if it does not exist os.makedirs(directory)
entity_file = f'{directory}/{current_time}.json' # Separate file for each entity's data entity_file = f'{directory}/{current_time}.json'
with open(entity_file, 'w') as f: with open(entity_file, 'w') as f:
json.dump(data, f, indent=4) json.dump(data, f, indent=4)
def update_server_knowledge_cache(self,entity, server_knowledge): def update_server_knowledge_cache(self, entity: str, server_knowledge: Any):
"""
Update the server knowledge cache for a specific entity.
"""
try: try:
with open(self.knowledge_file, 'r') as f: with open(self.knowledge_file, 'r') as f:
knowledge_cache = json.load(f) knowledge_cache = json.load(f)
@@ -43,21 +55,26 @@ class injest:
with open(self.knowledge_file, 'w') as f: with open(self.knowledge_file, 'w') as f:
json.dump(knowledge_cache, f, indent=4) json.dump(knowledge_cache, f, indent=4)
def check_rate_limit(self,response): def check_rate_limit(self, response: requests.Response):
"""
Check and handle the rate limit based on the response headers.
"""
rate_limit_header = response.headers.get('X-Rate-Limit') rate_limit_header = response.headers.get('X-Rate-Limit')
if rate_limit_header: if rate_limit_header:
requests_made, limit = map(int, rate_limit_header.split('/')) requests_made, limit = map(int, rate_limit_header.split('/'))
remaining_requests = limit - requests_made remaining_requests = limit - requests_made
logging.info(f"Rate Limit: {remaining_requests}/{limit} requests remaining.") logging.info(f"Rate Limit: {remaining_requests}/{limit} requests remaining.")
if remaining_requests < 20: # Arbitrary low number to start handling the limit if remaining_requests < 20:
logging.warning("Approaching rate limit. Consider pausing further requests.") logging.warning("Approaching rate limit. Consider pausing further requests.")
# Implement pause or delay logic here if necessary # Implement pause or delay logic here if necessary
else: else:
logging.warning("X-Rate-Limit header is missing.") logging.warning("X-Rate-Limit header is missing.")
def fetch_and_cache_entity_data(self): def fetch_and_cache_entity_data(self):
"""
Fetch and cache data for all entities.
"""
for entity in self.entities: for entity in self.entities:
logging.debug(f'entity type is {type(entity)}')
last_knowledge = self.knowledge_cache.get(entity, 0) last_knowledge = self.knowledge_cache.get(entity, 0)
logging.debug(f'Last Knowledge of {entity.capitalize()}: {last_knowledge}') logging.debug(f'Last Knowledge of {entity.capitalize()}: {last_knowledge}')
url = f'{self.base_url}/{self.budget_id}/{entity}' url = f'{self.base_url}/{self.budget_id}/{entity}'
@@ -66,26 +83,21 @@ class injest:
url = url + f'?last_knowledge_of_server={last_knowledge}' url = url + f'?last_knowledge_of_server={last_knowledge}'
response = requests.get(url, headers=self.headers) response = requests.get(url, headers=self.headers)
self.check_rate_limit(response) # Check and handle rate limit self.check_rate_limit(response)
if response.status_code == 429: # HTTP 429 Too Many Requests if response.status_code == 429:
logging.error("Rate limit exceeded. Pausing until the limit is reset.") logging.error("Rate limit exceeded. Pausing until the limit is reset.")
# Implement pause until the limit reset logic here # Implement pause until the limit reset logic here
break break
data = response.json() data = response.json()
server_knowledge = data['data'].get('server_knowledge') server_knowledge = data['data'].get('server_knowledge')
logging.debug(f'{entity.capitalize()} Server Knowledge: {server_knowledge}') logging.debug(f'{entity.capitalize()} Server Knowledge: {server_knowledge}')
# Check if there is new server knowledge
if server_knowledge is not None and server_knowledge != last_knowledge: if server_knowledge is not None and server_knowledge != last_knowledge:
# Update server knowledge cache
self.update_server_knowledge_cache(entity, server_knowledge) self.update_server_knowledge_cache(entity, server_knowledge)
# Update entity data cache without server knowledge
entity_data = data['data'] entity_data = data['data']
entity_data.pop('server_knowledge', None) # Remove server knowledge if exists entity_data.pop('server_knowledge', None)
self.update_entity_data_cache(entity, entity_data) self.save_entity_data_to_raw(entity, entity_data)
else: else:
logging.info(f"No new data for {entity}. Skipping cache update.") logging.info(f"No new data for {entity}. Skipping cache update.")
+10 -13
View File
@@ -1,25 +1,22 @@
import requests
import os import os
import json
import dotenv import dotenv
import logging import logging
import time from ingest import Ingest
from injest import injest
dotenv.load_dotenv() dotenv.load_dotenv()
API_TOKEN = os.getenv('API_TOKEN') API_TOKEN = os.getenv('API_TOKEN')
BUDGET_ID = os.getenv('BUDGET_ID') BUDGET_ID = os.getenv('BUDGET_ID')
headers = {'Authorization': f'Bearer {API_TOKEN}'}
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
injest_info = {} entities = ['accounts', 'categories', 'months', 'payees', 'transactions', 'scheduled_transactions']
#entities = ['accounts', 'categories', 'months', 'payees', 'transactions', 'scheduled_transactions'] ingest_info = {}
#injest_info['entities'] = entities
injest_info['base_url'] = 'https://api.ynab.com/v1/budgets' ingest_info['entities'] = entities
injest_info['knowledge_file'] = 'server_knowledge_cache.json' ingest_info['base_url'] = 'https://api.ynab.com/v1/budgets'
injest_info['API_TOKEN'] = API_TOKEN ingest_info['knowledge_file'] = 'server_knowledge_cache.json'
injest_info['BUDGET_ID'] = BUDGET_ID ingest_info['API_TOKEN'] = API_TOKEN
ingest_info['BUDGET_ID'] = BUDGET_ID
injest(injest_info)#.fetch_and_cache_entity_data() Ingest(ingest_info)
+74
View File
@@ -0,0 +1,74 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"ename": "",
"evalue": "",
"output_type": "error",
"traceback": [
"\u001b[1;31mRunning cells with 'Python 3.12.4' requires the ipykernel package.\n",
"\u001b[1;31mRun the following command to install 'ipykernel' into the Python environment. \n",
"\u001b[1;31mCommand: '/bin/python3.12 -m pip install ipykernel -U --user --force-reinstall'"
]
}
],
"source": [
"from pyspark.sql import SparkSession\n",
"from pyspark.sql.functions import *\n",
"from pyspark.sql.types import *\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"ename": "",
"evalue": "",
"output_type": "error",
"traceback": [
"\u001b[1;31mRunning cells with 'Python 3.12.4' requires the ipykernel package.\n",
"\u001b[1;31mRun the following command to install 'ipykernel' into the Python environment. \n",
"\u001b[1;31mCommand: '/bin/python3.12 -m pip install ipykernel -U --user --force-reinstall'"
]
}
],
"source": [
"\n",
"spark = SparkSession.builder.appName(\"finance_dwh\").config(\"spark.memory.offHeap.enabled\",\"true\").config(\"spark.memory.offHeap.size\",\"10g\").getOrCreate()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\n",
"accounts_data = spark.read.json(\"data/20240728094708.json\")\n",
"accounts_data.printSchema()\n",
"#accounts_data.show()\n",
"\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.12.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}