From 23458af13331ed4c5b9fd1641c450f57e4864cd3 Mon Sep 17 00:00:00 2001 From: Jake Pullen Date: Mon, 29 Jul 2024 07:40:45 +0100 Subject: [PATCH] start of raw_to_base --- __pycache__/injest.cpython-310.pyc | Bin 3420 -> 3380 bytes injest.py => ingest.py | 72 ++++++++++++++++------------ main.py | 23 ++++----- raw_to_base.ipynb | 74 +++++++++++++++++++++++++++++ 4 files changed, 126 insertions(+), 43 deletions(-) rename injest.py => ingest.py (61%) create mode 100644 raw_to_base.ipynb diff --git a/__pycache__/injest.cpython-310.pyc b/__pycache__/injest.cpython-310.pyc index 340d397d3d76d0049b66c72df460227dc92e5a20..2c14668961e1d9dae6e9f84a5f19a954e02083f7 100644 GIT binary patch delta 1282 zcmca3wMB|ApO=@5fq{X+xO!QdxZp&-2QEGdksSt!)%7ROqq<6Z5U0IIg%2KQ{zjEa&GZu=jE5@q^6{& z#;0ZGq~794%`3?)$xJPtoX;q!$yvnAz`$^eGpRHsJ+&l0Go^@?fq|ij4MebmWY{xO z6H`))iYH4j#V`s?E@iS4;9`V9Ax17n9;V5MnI@TnblhUeFG$V1#gbK=pLdHTCqFSo zlc|UgWC~+ikpRd@Fd+zHaWgP5h-~g+=3~?m0r5pagcwLIM{Z(vYD#8NagjJkOadgv zl2V#mpvhjuH~9#Qg@6=D9BdSr5S+}zD$6J}S&Q|1y)Z~h21Lk$2ss7@22D1IK_C|v zflMos1Q{vlmYI|4mtW$RUz(TVT2z!@RHO`2%?fsLkqStSKyhkOS!z){JP3=xR`Y<2 z23rj#K>oQUTUwBkSTcD5n*^isB6KtQbsybqFM9 zq$X#_7bTXY#^+?_W|rtrKE>f%AHvAMP{Oc)F@>>&sfHnoxtVb`!(66?jEoE*o(n_l z%vwgUJW~l%4I_#y%K}zVLRrXE%LG=zQp31_Z6U*Kh7{Hsrr8X0S)it{&t^zrt6|9E z0Mj7zS!9v&XBoF-clI14L)Oa{9j6i`KWATCHRilHnF3`K&It$39HxDFzP delta 1303 zcmdlYbw`RXpO=@5fq{WRPP2T{L;LV;#*wFi6yD&`9+zj#kbgU^YcnFif^$MBvz)T7T@A2 zDN4*MPD}>rF1{sJoSc!GQks*R5|1Lzk(5}R8edwJbBixKFTXq|H6=YYJ}omRRa29* zh?#+bp@@Znf#DW=MrvY8YEkhmj?}!8%#zI1;vyc9BrgL4!{pP9F^r;L$oy1qeHk)BCGe}PjTM9!dW6|Uqwi>2v=Ay|p3=0Hm7;D%TGBGkhOsHkAVP7C< zn8KUF02X9eAXLLx!wzB>GJ{>s4|cTx#K1ykkRvn&{U%#*=`$)!&fxN9n*!m)nYl`eaVj#^(2EvnF5fcLg0|x^GLovuF91I+c99$f1 r9Go0HJUn1IO|~L%u2Th>0`?*( Dict[str, Any]: + """ + Load the knowledge cache from the file if it exists. + """ if os.path.exists(self.knowledge_file): with open(self.knowledge_file, 'r') as f: return json.load(f) return {} - - def update_entity_data_cache(self,entity, data): + + def save_entity_data_to_raw(self, entity: str, data: Dict[str, Any]): + """ + Save the data for a specific entity to a new cache file. + """ current_time = time.strftime('%Y%m%d%H%M%S') - directory = f'data/{entity}' # Directory name is the entity's name + directory = f'data/{entity}' if not os.path.exists(directory): - os.makedirs(directory) # Create the directory if it does not exist - entity_file = f'{directory}/{current_time}.json' # Separate file for each entity's data + os.makedirs(directory) + entity_file = f'{directory}/{current_time}.json' with open(entity_file, 'w') as f: json.dump(data, f, indent=4) - def update_server_knowledge_cache(self,entity, server_knowledge): + def update_server_knowledge_cache(self, entity: str, server_knowledge: Any): + """ + Update the server knowledge cache for a specific entity. + """ try: with open(self.knowledge_file, 'r') as f: knowledge_cache = json.load(f) @@ -43,21 +55,26 @@ class injest: with open(self.knowledge_file, 'w') as f: json.dump(knowledge_cache, f, indent=4) - def check_rate_limit(self,response): + def check_rate_limit(self, response: requests.Response): + """ + Check and handle the rate limit based on the response headers. + """ rate_limit_header = response.headers.get('X-Rate-Limit') if rate_limit_header: requests_made, limit = map(int, rate_limit_header.split('/')) remaining_requests = limit - requests_made logging.info(f"Rate Limit: {remaining_requests}/{limit} requests remaining.") - if remaining_requests < 20: # Arbitrary low number to start handling the limit + if remaining_requests < 20: logging.warning("Approaching rate limit. Consider pausing further requests.") # Implement pause or delay logic here if necessary else: logging.warning("X-Rate-Limit header is missing.") - def fetch_and_cache_entity_data(self): + def fetch_and_cache_entity_data(self): + """ + Fetch and cache data for all entities. + """ for entity in self.entities: - logging.debug(f'entity type is {type(entity)}') last_knowledge = self.knowledge_cache.get(entity, 0) logging.debug(f'Last Knowledge of {entity.capitalize()}: {last_knowledge}') url = f'{self.base_url}/{self.budget_id}/{entity}' @@ -66,26 +83,21 @@ class injest: url = url + f'?last_knowledge_of_server={last_knowledge}' response = requests.get(url, headers=self.headers) - self.check_rate_limit(response) # Check and handle rate limit + self.check_rate_limit(response) - if response.status_code == 429: # HTTP 429 Too Many Requests + if response.status_code == 429: logging.error("Rate limit exceeded. Pausing until the limit is reset.") # Implement pause until the limit reset logic here break data = response.json() - server_knowledge = data['data'].get('server_knowledge') logging.debug(f'{entity.capitalize()} Server Knowledge: {server_knowledge}') - # Check if there is new server knowledge if server_knowledge is not None and server_knowledge != last_knowledge: - # Update server knowledge cache self.update_server_knowledge_cache(entity, server_knowledge) - - # Update entity data cache without server knowledge entity_data = data['data'] - entity_data.pop('server_knowledge', None) # Remove server knowledge if exists - self.update_entity_data_cache(entity, entity_data) + entity_data.pop('server_knowledge', None) + self.save_entity_data_to_raw(entity, entity_data) else: - logging.info(f"No new data for {entity}. Skipping cache update.") + logging.info(f"No new data for {entity}. Skipping cache update.") \ No newline at end of file diff --git a/main.py b/main.py index 9bb9ead..eed884a 100644 --- a/main.py +++ b/main.py @@ -1,25 +1,22 @@ -import requests import os -import json import dotenv import logging -import time -from injest import injest +from ingest import Ingest dotenv.load_dotenv() API_TOKEN = os.getenv('API_TOKEN') BUDGET_ID = os.getenv('BUDGET_ID') -headers = {'Authorization': f'Bearer {API_TOKEN}'} logging.basicConfig(level=logging.DEBUG) -injest_info = {} -#entities = ['accounts', 'categories', 'months', 'payees', 'transactions', 'scheduled_transactions'] -#injest_info['entities'] = entities -injest_info['base_url'] = 'https://api.ynab.com/v1/budgets' -injest_info['knowledge_file'] = 'server_knowledge_cache.json' -injest_info['API_TOKEN'] = API_TOKEN -injest_info['BUDGET_ID'] = BUDGET_ID +entities = ['accounts', 'categories', 'months', 'payees', 'transactions', 'scheduled_transactions'] +ingest_info = {} + +ingest_info['entities'] = entities +ingest_info['base_url'] = 'https://api.ynab.com/v1/budgets' +ingest_info['knowledge_file'] = 'server_knowledge_cache.json' +ingest_info['API_TOKEN'] = API_TOKEN +ingest_info['BUDGET_ID'] = BUDGET_ID -injest(injest_info)#.fetch_and_cache_entity_data() +Ingest(ingest_info) diff --git a/raw_to_base.ipynb b/raw_to_base.ipynb new file mode 100644 index 0000000..5171401 --- /dev/null +++ b/raw_to_base.ipynb @@ -0,0 +1,74 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "ename": "", + "evalue": "", + "output_type": "error", + "traceback": [ + "\u001b[1;31mRunning cells with 'Python 3.12.4' requires the ipykernel package.\n", + "\u001b[1;31mRun the following command to install 'ipykernel' into the Python environment. \n", + "\u001b[1;31mCommand: '/bin/python3.12 -m pip install ipykernel -U --user --force-reinstall'" + ] + } + ], + "source": [ + "from pyspark.sql import SparkSession\n", + "from pyspark.sql.functions import *\n", + "from pyspark.sql.types import *\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "ename": "", + "evalue": "", + "output_type": "error", + "traceback": [ + "\u001b[1;31mRunning cells with 'Python 3.12.4' requires the ipykernel package.\n", + "\u001b[1;31mRun the following command to install 'ipykernel' into the Python environment. \n", + "\u001b[1;31mCommand: '/bin/python3.12 -m pip install ipykernel -U --user --force-reinstall'" + ] + } + ], + "source": [ + "\n", + "spark = SparkSession.builder.appName(\"finance_dwh\").config(\"spark.memory.offHeap.enabled\",\"true\").config(\"spark.memory.offHeap.size\",\"10g\").getOrCreate()\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "accounts_data = spark.read.json(\"data/20240728094708.json\")\n", + "accounts_data.printSchema()\n", + "#accounts_data.show()\n", + "\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.12.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}