Bug fix no more request limit (#18)

* added tests

* Removed method no longer used due to YNAB api Changes
This commit is contained in:
Jake-Pullen
2025-04-04 18:49:44 +01:00
committed by GitHub
parent d155a4c907
commit 5af82e5753
8 changed files with 432 additions and 63 deletions
+2 -1
View File
@@ -8,4 +8,5 @@ __pycache__/*
*/__pycache__/* */__pycache__/*
*.pbix *.pbix
/logs/* /logs/*
test.py .vscode/*
*.coverage
View File
+15 -16
View File
@@ -24,6 +24,18 @@ def set_up_logging():
queue_handler.listener.start() queue_handler.listener.start()
atexit.register(queue_handler.listener.stop) atexit.register(queue_handler.listener.stop)
def load_config():
try:
with open('config/config.yaml', 'r') as file:
config = yaml.safe_load(file)
return config
except FileNotFoundError:
logging.error('config.yaml file not found')
sys.exit(ec.MISSING_CONFIG_FILE)
except yaml.YAMLError as e:
logging.error(f'Error loading config.yaml: {e}')
sys.exit(ec.CORRUPTED_CONFIG_FILE)
logger = logging.getLogger("data_pipeline_for_ynab") logger = logging.getLogger("data_pipeline_for_ynab")
os.makedirs('logs', exist_ok=True) os.makedirs('logs', exist_ok=True)
set_up_logging() set_up_logging()
@@ -34,27 +46,14 @@ dotenv.load_dotenv()
API_TOKEN = os.getenv('API_TOKEN') API_TOKEN = os.getenv('API_TOKEN')
BUDGET_ID = os.getenv('BUDGET_ID') BUDGET_ID = os.getenv('BUDGET_ID')
if not API_TOKEN or not BUDGET_ID: if not API_TOKEN or not BUDGET_ID:
logging.error('API_TOKEN or BUDGET_ID is not set in .env file') logging.error('API_TOKEN or BUDGET_ID is not set in .env file')
sys.exit(ec.MISSING_ENV_VARS) sys.exit(ec.MISSING_ENV_VARS)
try:
with open('config/config.yaml', 'r') as file:
config = yaml.safe_load(file)
except FileNotFoundError:
logging.error('config.yaml file not found')
sys.exit(ec.MISSING_CONFIG_FILE)
except yaml.YAMLError as e:
logging.error(f'Error loading config.yaml: {e}')
sys.exit(ec.CORRUPTED_CONFIG_FILE)
config['API_TOKEN'] = API_TOKEN
config['BUDGET_ID'] = BUDGET_ID
#sys.exit(ec.SUCCESS)
if __name__ == '__main__': if __name__ == '__main__':
config = load_config()
config['API_TOKEN'] = API_TOKEN
config['BUDGET_ID'] = BUDGET_ID
try: try:
pipeline_main(config) pipeline_main(config)
+32 -44
View File
@@ -4,13 +4,11 @@ import json
import logging import logging
import requests import requests
import sys import sys
import yaml
from typing import Dict, Any from typing import Dict, Any
import config.exit_codes as ec import config.exit_codes as ec
class Ingest: class Ingest:
def __init__(self, config: Dict[str, Any]): def __init__(self, config: Dict[str, Any]):
""" """
Initialize the Ingest class with the provided configuration. Initialize the Ingest class with the provided configuration.
@@ -22,19 +20,9 @@ class Ingest:
self.entities = config['entities'] self.entities = config['entities']
self.raw_data_path = config['raw_data_path'] self.raw_data_path = config['raw_data_path']
self.headers = {'Authorization': f'Bearer {self.api_token}'} self.headers = {'Authorization': f'Bearer {self.api_token}'}
self.knowledge_cache = self.load_knowledge_cache()
self.MAX_RETRIES = config['REQUESTS_MAX_RETRIES'] self.MAX_RETRIES = config['REQUESTS_MAX_RETRIES']
self.RETRY_DELAY = config['REQUESTS_RETRY_DELAY'] self.RETRY_DELAY = config['REQUESTS_RETRY_DELAY']
self.fetch_and_cache_entity_data()
def load_knowledge_cache(self) -> Dict[str, Any]:
"""
Load the knowledge cache from the file if it exists.
"""
if os.path.exists(self.knowledge_file):
with open(self.knowledge_file, 'r') as f:
return json.load(f)
return {}
def save_entity_data_to_raw(self, entity: str, data: Dict[str, Any]): def save_entity_data_to_raw(self, entity: str, data: Dict[str, Any]):
""" """
@@ -50,8 +38,18 @@ class Ingest:
with open(entity_file, 'w') as f: with open(entity_file, 'w') as f:
json.dump(data, f, indent=4) json.dump(data, f, indent=4)
except Exception as e: except Exception as e:
logging.error(f"Error saving {entity} data: {e}") logging.error(f"Failed to save data for {entity} to {entity_file}")
raise e
def load_knowledge_cache(self) -> Dict[str, Any]:
"""
Load the knowledge cache from the file if it exists.
"""
if not os.path.exists(self.knowledge_file):
os.makedirs(os.path.dirname(self.knowledge_file),exist_ok=True)
return {}
with open(self.knowledge_file, 'r') as f:
return json.load(f)
def update_server_knowledge_cache(self, entity: str, server_knowledge: Any): def update_server_knowledge_cache(self, entity: str, server_knowledge: Any):
""" """
@@ -64,31 +62,21 @@ class Ingest:
logging.info(f"Knowledge file not found. Creating a new one at {self.knowledge_file}. This is normal for the first run.") logging.info(f"Knowledge file not found. Creating a new one at {self.knowledge_file}. This is normal for the first run.")
os.makedirs(os.path.dirname(self.knowledge_file), exist_ok=True) os.makedirs(os.path.dirname(self.knowledge_file), exist_ok=True)
knowledge_cache = {} knowledge_cache = {}
knowledge_cache[entity] = server_knowledge knowledge_cache[entity] = server_knowledge
with open(self.knowledge_file, 'w') as f: with open(self.knowledge_file, 'w') as f:
json.dump(knowledge_cache, f, indent=4) json.dump(knowledge_cache, f, indent=4)
def check_rate_limit(self, response: requests.Response): knowledge_cache = self.load_knowledge_cache()
""" knowledge_cache[entity] = server_knowledge
Check and handle the rate limit based on the response headers. try:
""" with open(self.knowledge_file, 'w') as f:
rate_limit_header = response.headers.get('X-Rate-Limit') json.dump(knowledge_cache, f, indent=4)
if rate_limit_header: except Exception as e:
requests_made, limit = map(int, rate_limit_header.split('/')) logging.error(f"Failed to update knowledge cache for {entity} in {self.knowledge_file}")
remaining_requests = limit - requests_made raise e
logging.info(f"Rate Limit: {remaining_requests}/{limit} requests remaining.")
if remaining_requests < 20:
logging.warning("Approaching rate limit. Consider pausing further requests.")
# Implement pause or delay logic here if necessary
if remaining_requests == 1:
logging.error("Rate limit exceeded. ending requests here and moving on with what we have.")
return True #returning True here to break out of any more ingestions
else:
logging.warning("X-Rate-Limit header is missing.")
def handle_response(self, response) -> bool: def handle_response(self, response) -> bool:
if response.status_code == 400: if response.status_code == 400:
logging.error("Bad request. The request could not be understood by the API due to malformed syntax or validation errors.") logging.error("Bad request. The request could not be understood by the API due to malformed syntax or validation errors.")
@@ -100,14 +88,14 @@ class Ingest:
logging.error("Forbidden. Access is denied.") logging.error("Forbidden. Access is denied.")
sys.exit(ec.FORBIDDEN) sys.exit(ec.FORBIDDEN)
elif response.status_code == 404: elif response.status_code == 404:
logging.error("Not found. The specified URI does not exist.") logging.error("Not found. The specified URL does not exist.")
sys.exit(ec.NOT_FOUND) sys.exit(ec.NOT_FOUND)
elif response.status_code == 409: elif response.status_code == 409:
logging.error("Conflict. The resource cannot be saved due to a conflict.") logging.error("Conflict. The resource cannot be saved due to a conflict.")
sys.exit(ec.CONFLICT) sys.exit(ec.CONFLICT)
elif response.status_code == 429: elif response.status_code == 429:
logging.error("Too many requests. You have made too many requests in a short amount of time.") logging.error("Too many requests. You have made too many requests in a short amount of time.")
return True return True
elif response.status_code == 500: elif response.status_code == 500:
logging.error("Internal server error. The API experienced an unexpected error.") logging.error("Internal server error. The API experienced an unexpected error.")
return True return True
@@ -118,7 +106,7 @@ class Ingest:
response.raise_for_status() response.raise_for_status()
return False return False
def fetch_and_cache_entity_data(self): def start_ingestion(self):
""" """
Fetch and cache data for all entities. Fetch and cache data for all entities.
""" """
@@ -128,11 +116,13 @@ class Ingest:
logging.warning(f"Raw data exists for {entity} processing any raw data we already have.") logging.warning(f"Raw data exists for {entity} processing any raw data we already have.")
break # break here instead of continue as we dont want to update our server knowledge cache and potentially miss data. break # break here instead of continue as we dont want to update our server knowledge cache and potentially miss data.
last_knowledge = self.knowledge_cache.get(entity, 0) knowledge_cache = self.load_knowledge_cache()
last_knowledge = knowledge_cache.get(entity, 0)
#logging.debug(f'Last Knowledge of {entity}: {last_knowledge}') #logging.debug(f'Last Knowledge of {entity}: {last_knowledge}')
logging.info(f'Fetching {entity} data since last knowledge: {last_knowledge}') logging.info(f'Fetching {entity} data since last knowledge: {last_knowledge}')
url = f'{self.base_url}/{self.budget_id}/{entity}?last_knowledge_of_server={last_knowledge}' url = f'{self.base_url}/{self.budget_id}/{entity}?last_knowledge_of_server={last_knowledge}'
response = None
for attempt in range(self.MAX_RETRIES): for attempt in range(self.MAX_RETRIES):
try: try:
response = requests.get(url, headers=self.headers) response = requests.get(url, headers=self.headers)
@@ -146,11 +136,12 @@ class Ingest:
else: else:
logging.error("Max retries reached. Exiting.") logging.error("Max retries reached. Exiting.")
sys.exit(ec.REQUESTS_ERROR) sys.exit(ec.REQUESTS_ERROR)
data = response.json() data = response.json()
logging.debug(f'response data: {data}')
server_knowledge = data['data'].get('server_knowledge') server_knowledge = data['data'].get('server_knowledge')
logging.debug(f'{entity} new server knowledge: {server_knowledge}') logging.debug(f'{entity} new server knowledge: {server_knowledge}')
if server_knowledge is not None and server_knowledge != last_knowledge: if server_knowledge is not None and server_knowledge != last_knowledge:
self.update_server_knowledge_cache(entity, server_knowledge) self.update_server_knowledge_cache(entity, server_knowledge)
entity_data = data['data'] entity_data = data['data']
@@ -158,6 +149,3 @@ class Ingest:
self.save_entity_data_to_raw(entity, entity_data) self.save_entity_data_to_raw(entity, entity_data)
else: else:
logging.info(f"No new data for {entity}. Skipping cache update.") logging.info(f"No new data for {entity}. Skipping cache update.")
if self.check_rate_limit(response):
break # break out here and continue processing the data we have.
+2 -1
View File
@@ -12,7 +12,8 @@ def pipeline_main(config):
'''Run the data pipeline''' '''Run the data pipeline'''
logging.info('Starting data pipeline') logging.info('Starting data pipeline')
Ingest(config) ingest = Ingest(config)
ingest.start_ingestion()
RawToBase(config) RawToBase(config)
DimAccounts(config) DimAccounts(config)
DimCategories(config) DimCategories(config)
+3 -1
View File
@@ -6,4 +6,6 @@ pyyaml
dash dash
pandas pandas
pyarrow pyarrow
dash-bootstrap-components dash-bootstrap-components
# testing requirements below
pytest
+307
View File
@@ -0,0 +1,307 @@
import pytest
from unittest.mock import patch, mock_open, MagicMock
import json
import os
from typing import Dict, Any
import logging
from pipeline.ingest import Ingest
import config.exit_codes as ec
# Mock configuration for initializing the Ingest class
mock_config = {
'API_TOKEN': 'test_token',
'BUDGET_ID': 'test_budget_id',
'base_url': 'http://test_base_url',
'knowledge_file': 'data/test_knowledge_file.json',
'entities': ['entity1', 'entity2'],
'raw_data_path': 'test_raw_data_path',
'REQUESTS_MAX_RETRIES': 3,
'REQUESTS_RETRY_DELAY': 1
}
# Test for load_knowledge_cache method
def test_load_knowledge_cache_file_exists():
mock_data = {"key": "value"}
with patch('os.path.exists', return_value=True), \
patch('builtins.open', mock_open(read_data=json.dumps(mock_data))) as mock_file:
ingest_instance = Ingest(mock_config)
result = ingest_instance.load_knowledge_cache()
mock_file.assert_called_once_with(mock_config['knowledge_file'], 'r')
assert result == mock_data
def test_load_knowledge_cache_file_not_exists():
with patch('os.path.exists', return_value=False):
ingest_instance = Ingest(mock_config)
result = ingest_instance.load_knowledge_cache()
assert result == {}
# Test for save_entity_data_to_raw method
def test_save_entity_data_to_raw_success():
entity = 'entity1'
data = {"key": "value"}
current_time = '20230101123000'
directory = os.path.join(mock_config['raw_data_path'], entity)
entity_file = f'{directory}/{current_time}.json'
with patch('os.path.exists', return_value=False), \
patch('os.makedirs') as mock_makedirs, \
patch('builtins.open', mock_open()) as mock_file, \
patch('time.strftime', return_value=current_time), \
patch('logging.info') as mock_logging_info:
ingest_instance = Ingest(mock_config)
ingest_instance.save_entity_data_to_raw(entity, data)
mock_makedirs.assert_called_once_with(directory)
mock_file.assert_called_once_with(entity_file, 'w')
# Get the file handle and check the written content
handle = mock_file()
handle.write.assert_called()
written_content = ''.join(call.args[0] for call in handle.write.call_args_list)
assert written_content == json.dumps(data, indent=4)
mock_logging_info.assert_called_once_with(f"Saving {entity} data to {entity_file}")
def test_save_entity_data_to_raw_existing_directory():
entity = 'entity1'
data = {"key": "value"}
current_time = '20230101123000'
directory = os.path.join(mock_config['raw_data_path'], entity)
entity_file = f'{directory}/{current_time}.json'
with patch('os.path.exists', return_value=True), \
patch('os.makedirs') as mock_makedirs, \
patch('builtins.open', mock_open()) as mock_file, \
patch('time.strftime', return_value=current_time), \
patch('logging.info') as mock_logging_info:
ingest_instance = Ingest(mock_config)
ingest_instance.save_entity_data_to_raw(entity, data)
mock_makedirs.assert_not_called()
mock_file.assert_called_once_with(entity_file, 'w')
# Get the file handle and check the written content
handle = mock_file()
handle.write.assert_called()
written_content = ''.join(call.args[0] for call in handle.write.call_args_list)
assert written_content == json.dumps(data, indent=4)
mock_logging_info.assert_called_once_with(f"Saving {entity} data to {entity_file}")
def test_save_entity_data_to_raw_error():
entity = 'entity1'
data = {"key": "value"}
current_time = '20230101123000'
directory = os.path.join(mock_config['raw_data_path'], entity)
entity_file = f'{directory}/{current_time}.json'
with patch('os.path.exists', return_value=True), \
patch('builtins.open', mock_open()) as mock_file, \
patch('time.strftime', return_value=current_time), \
patch('logging.info') as mock_logging_info, \
patch('logging.error') as mock_logging_error:
mock_file.side_effect = Exception("Test error")
ingest_instance = Ingest(mock_config)
with pytest.raises(Exception, match="Test error"):
ingest_instance.save_entity_data_to_raw(entity, data)
mock_logging_error.assert_called_once_with(f"Failed to save data for {entity} to {entity_file}")
def test_update_server_knowledge_cache_file_exists():
entity = 'entity1'
server_knowledge = {"key": "value"}
existing_cache = {"entity2": {"key": "old_value"}}
updated_cache = {"entity2": {"key": "old_value"}, "entity1": {"key": "value"}}
with patch('builtins.open', mock_open(read_data=json.dumps(existing_cache))) as mock_file, \
patch('os.path.exists', return_value=True), \
patch('logging.error') as mock_logging_error:
ingest_instance = Ingest(mock_config)
ingest_instance.update_server_knowledge_cache(entity, server_knowledge)
mock_file.assert_called_with(mock_config['knowledge_file'], 'w')
handle = mock_file()
handle.write.assert_called()
written_content = ''.join(call.args[0] for call in handle.write.call_args_list)
assert json.loads(written_content) == updated_cache
mock_logging_error.assert_not_called()
def test_update_server_knowledge_cache_file_not_exists():
entity = 'entity1'
server_knowledge = {"key": "value"}
updated_cache = {"entity1": {"key": "value"}}
with patch('builtins.open', mock_open()) as mock_file, \
patch('os.path.exists', return_value=False), \
patch('os.makedirs') as mock_makedirs, \
patch('logging.info') as mock_logging_info, \
patch('logging.error') as mock_logging_error:
# Ensure the side_effect list has enough elements to cover all calls to open
mock_file.side_effect = [FileNotFoundError(), mock_open().return_value]
ingest_instance = Ingest(mock_config)
with pytest.raises(FileNotFoundError):
ingest_instance.update_server_knowledge_cache(entity, server_knowledge)
mock_makedirs.assert_called_once_with(os.path.dirname(mock_config['knowledge_file']), exist_ok=True)
mock_file.assert_called_with(mock_config['knowledge_file'], 'w')
mock_logging_error.assert_called_once_with(f"Failed to update knowledge cache for {entity} in {mock_config['knowledge_file']}")
def test_update_server_knowledge_cache_write_error():
entity = 'entity1'
server_knowledge = {"key": "value"}
with patch('builtins.open', mock_open()) as mock_file, \
patch('logging.error') as mock_logging_error:
mock_file.side_effect = Exception("Test error")
ingest_instance = Ingest(mock_config)
with pytest.raises(Exception, match="Test error"):
ingest_instance.update_server_knowledge_cache(entity, server_knowledge)
mock_logging_error.assert_called_once_with(f"Failed to update knowledge cache for {entity} in {mock_config['knowledge_file']}")
def test_check_rate_limit_above_threshold():
response = MagicMock()
response.headers = {'X-Rate-Limit': '10/100'}
ingest_instance = Ingest(mock_config)
result = ingest_instance.check_rate_limit(response)
assert result is None
def test_check_rate_limit_below_threshold():
response = MagicMock()
response.headers = {'X-Rate-Limit': '90/100'}
ingest_instance = Ingest(mock_config)
result = ingest_instance.check_rate_limit(response)
assert result is None
def test_check_rate_limit_exceeded():
response = MagicMock()
response.headers = {'X-Rate-Limit': '100/100'}
ingest_instance = Ingest(mock_config)
result = ingest_instance.check_rate_limit(response)
assert result is True
def test_check_rate_limit_header_missing():
response = MagicMock()
response.headers = {}
ingest_instance = Ingest(mock_config)
result = ingest_instance.check_rate_limit(response)
assert result is None
def test_handle_response_bad_request():
response = MagicMock()
response.status_code = 400
ingest_instance = Ingest(mock_config)
with pytest.raises(SystemExit) as e:
ingest_instance.handle_response(response)
assert e.type == SystemExit
assert e.value.code == ec.BAD_REQUEST
def test_handle_response_unauthorized():
response = MagicMock()
response.status_code = 401
ingest_instance = Ingest(mock_config)
with pytest.raises(SystemExit) as e:
ingest_instance.handle_response(response)
assert e.type == SystemExit
assert e.value.code == ec.UNAUTHORIZED_API_TOKEN
def test_handle_response_forbidden():
response = MagicMock()
response.status_code = 403
ingest_instance = Ingest(mock_config)
with pytest.raises(SystemExit) as e:
ingest_instance.handle_response(response)
assert e.type == SystemExit
assert e.value.code == ec.FORBIDDEN
def test_handle_response_not_found():
response = MagicMock()
response.status_code = 404
ingest_instance = Ingest(mock_config)
with pytest.raises(SystemExit) as e:
ingest_instance.handle_response(response)
assert e.type == SystemExit
assert e.value.code == ec.NOT_FOUND
def test_handle_response_conflict():
response = MagicMock()
response.status_code = 409
ingest_instance = Ingest(mock_config)
with pytest.raises(SystemExit) as e:
ingest_instance.handle_response(response)
assert e.type == SystemExit
assert e.value.code == ec.CONFLICT
def test_handle_response_too_many_requests():
response = MagicMock()
response.status_code = 429
ingest_instance = Ingest(mock_config)
result = ingest_instance.handle_response(response)
assert result is True
def test_handle_response_internal_server_error():
response = MagicMock()
response.status_code = 500
ingest_instance = Ingest(mock_config)
result = ingest_instance.handle_response(response)
assert result is True
def test_handle_response_service_unavailable():
response = MagicMock()
response.status_code = 503
ingest_instance = Ingest(mock_config)
result = ingest_instance.handle_response(response)
assert result is True
def test_handle_response_ok():
response = MagicMock()
response.status_code = 200
ingest_instance = Ingest(mock_config)
result = ingest_instance.handle_response(response)
assert result is False
if __name__ == "__main__":
pytest.main()
+71
View File
@@ -0,0 +1,71 @@
import pytest
from unittest.mock import patch, mock_open, MagicMock
import yaml
import logging
import atexit
import sys
from main import set_up_logging, load_config
import config.exit_codes as ec
# Test for set_up_logging function
def test_set_up_logging_success():
with patch('builtins.open', mock_open(read_data="handlers:\n queue_handler:\n class: logging.handlers.QueueHandler")), \
patch('yaml.safe_load', return_value={"handlers": {"queue_handler": {"class": "logging.handlers.QueueHandler"}}}), \
patch('logging.config.dictConfig') as mock_dict_config, \
patch('logging.getHandlerByName', return_value=MagicMock(listener=MagicMock(start=MagicMock(), stop=MagicMock()))), \
patch('atexit.register') as mock_atexit_register:
set_up_logging()
mock_dict_config.assert_called_once_with({"handlers": {"queue_handler": {"class": "logging.handlers.QueueHandler"}}})
mock_atexit_register.assert_called_once()
def test_set_up_logging_yaml_error():
with patch('builtins.open', mock_open(read_data="invalid_yaml")), \
patch('yaml.safe_load', side_effect=yaml.YAMLError("Error")), \
patch('logging.basicConfig') as mock_basic_config:
set_up_logging()
mock_basic_config.assert_called_once_with(level=logging.INFO)
def test_set_up_logging_no_queue_handler():
with patch('builtins.open', mock_open(read_data="handlers:\n queue_handler:\n class: logging.handlers.QueueHandler")), \
patch('yaml.safe_load', return_value={"handlers": {"queue_handler": {"class": "logging.handlers.QueueHandler"}}}), \
patch('logging.config.dictConfig') as mock_dict_config, \
patch('logging.getHandlerByName', return_value=None):
set_up_logging()
mock_dict_config.assert_called_once_with({"handlers": {"queue_handler": {"class": "logging.handlers.QueueHandler"}}})
# Test for load_config function
def test_load_config_success():
with patch('builtins.open', mock_open(read_data="key: value")), \
patch('yaml.safe_load', return_value={"key": "value"}):
config = load_config()
assert config == {"key": "value"}
def test_load_config_file_not_found():
with patch('builtins.open', side_effect=FileNotFoundError), \
patch('logging.error') as mock_logging_error, \
patch('sys.exit') as mock_sys_exit:
load_config()
mock_logging_error.assert_called_once_with('config.yaml file not found')
mock_sys_exit.assert_called_once_with(ec.MISSING_CONFIG_FILE)
def test_load_config_yaml_error():
with patch('builtins.open', mock_open(read_data="invalid_yaml")), \
patch('yaml.safe_load', side_effect=yaml.YAMLError("Error")), \
patch('logging.error') as mock_logging_error, \
patch('sys.exit') as mock_sys_exit:
load_config()
mock_logging_error.assert_called_once()
mock_sys_exit.assert_called_once_with(ec.CORRUPTED_CONFIG_FILE)