From 5af82e5753d40fd899d0e27439c0443df3c5b984 Mon Sep 17 00:00:00 2001 From: Jake-Pullen <88822333+Jake-Pullen@users.noreply.github.com> Date: Fri, 4 Apr 2025 18:49:44 +0100 Subject: [PATCH] Bug fix no more request limit (#18) * added tests * Removed method no longer used due to YNAB api Changes --- .gitignore | 3 +- __init__.py | 0 main.py | 31 ++-- pipeline/ingest.py | 76 ++++------ pipeline/pipeline_main.py | 3 +- requirements.txt | 4 +- tests/test_ingest.py | 307 ++++++++++++++++++++++++++++++++++++++ tests/test_main.py | 71 +++++++++ 8 files changed, 432 insertions(+), 63 deletions(-) create mode 100644 __init__.py create mode 100644 tests/test_ingest.py create mode 100644 tests/test_main.py diff --git a/.gitignore b/.gitignore index a185ba9..f4dbd26 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,5 @@ __pycache__/* */__pycache__/* *.pbix /logs/* -test.py +.vscode/* +*.coverage diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/main.py b/main.py index 41f079a..2ef5268 100644 --- a/main.py +++ b/main.py @@ -24,6 +24,18 @@ def set_up_logging(): queue_handler.listener.start() atexit.register(queue_handler.listener.stop) +def load_config(): + try: + with open('config/config.yaml', 'r') as file: + config = yaml.safe_load(file) + return config + except FileNotFoundError: + logging.error('config.yaml file not found') + sys.exit(ec.MISSING_CONFIG_FILE) + except yaml.YAMLError as e: + logging.error(f'Error loading config.yaml: {e}') + sys.exit(ec.CORRUPTED_CONFIG_FILE) + logger = logging.getLogger("data_pipeline_for_ynab") os.makedirs('logs', exist_ok=True) set_up_logging() @@ -34,27 +46,14 @@ dotenv.load_dotenv() API_TOKEN = os.getenv('API_TOKEN') BUDGET_ID = os.getenv('BUDGET_ID') - if not API_TOKEN or not BUDGET_ID: logging.error('API_TOKEN or BUDGET_ID is not set in .env file') sys.exit(ec.MISSING_ENV_VARS) -try: - with open('config/config.yaml', 'r') as file: - config = yaml.safe_load(file) -except FileNotFoundError: - logging.error('config.yaml file not found') - sys.exit(ec.MISSING_CONFIG_FILE) -except yaml.YAMLError as e: - logging.error(f'Error loading config.yaml: {e}') - sys.exit(ec.CORRUPTED_CONFIG_FILE) - -config['API_TOKEN'] = API_TOKEN -config['BUDGET_ID'] = BUDGET_ID - - #sys.exit(ec.SUCCESS) - if __name__ == '__main__': + config = load_config() + config['API_TOKEN'] = API_TOKEN + config['BUDGET_ID'] = BUDGET_ID try: pipeline_main(config) diff --git a/pipeline/ingest.py b/pipeline/ingest.py index 6b25b27..6b16c39 100644 --- a/pipeline/ingest.py +++ b/pipeline/ingest.py @@ -4,13 +4,11 @@ import json import logging import requests import sys -import yaml from typing import Dict, Any import config.exit_codes as ec class Ingest: - def __init__(self, config: Dict[str, Any]): """ Initialize the Ingest class with the provided configuration. @@ -22,19 +20,9 @@ class Ingest: self.entities = config['entities'] self.raw_data_path = config['raw_data_path'] self.headers = {'Authorization': f'Bearer {self.api_token}'} - self.knowledge_cache = self.load_knowledge_cache() self.MAX_RETRIES = config['REQUESTS_MAX_RETRIES'] self.RETRY_DELAY = config['REQUESTS_RETRY_DELAY'] - self.fetch_and_cache_entity_data() - def load_knowledge_cache(self) -> Dict[str, Any]: - """ - Load the knowledge cache from the file if it exists. - """ - if os.path.exists(self.knowledge_file): - with open(self.knowledge_file, 'r') as f: - return json.load(f) - return {} def save_entity_data_to_raw(self, entity: str, data: Dict[str, Any]): """ @@ -50,8 +38,18 @@ class Ingest: with open(entity_file, 'w') as f: json.dump(data, f, indent=4) except Exception as e: - logging.error(f"Error saving {entity} data: {e}") + logging.error(f"Failed to save data for {entity} to {entity_file}") + raise e + def load_knowledge_cache(self) -> Dict[str, Any]: + """ + Load the knowledge cache from the file if it exists. + """ + if not os.path.exists(self.knowledge_file): + os.makedirs(os.path.dirname(self.knowledge_file),exist_ok=True) + return {} + with open(self.knowledge_file, 'r') as f: + return json.load(f) def update_server_knowledge_cache(self, entity: str, server_knowledge: Any): """ @@ -64,31 +62,21 @@ class Ingest: logging.info(f"Knowledge file not found. Creating a new one at {self.knowledge_file}. This is normal for the first run.") os.makedirs(os.path.dirname(self.knowledge_file), exist_ok=True) knowledge_cache = {} - + knowledge_cache[entity] = server_knowledge - + with open(self.knowledge_file, 'w') as f: json.dump(knowledge_cache, f, indent=4) - def check_rate_limit(self, response: requests.Response): - """ - Check and handle the rate limit based on the response headers. - """ - rate_limit_header = response.headers.get('X-Rate-Limit') - if rate_limit_header: - requests_made, limit = map(int, rate_limit_header.split('/')) - remaining_requests = limit - requests_made - logging.info(f"Rate Limit: {remaining_requests}/{limit} requests remaining.") - if remaining_requests < 20: - logging.warning("Approaching rate limit. Consider pausing further requests.") - # Implement pause or delay logic here if necessary - if remaining_requests == 1: - logging.error("Rate limit exceeded. ending requests here and moving on with what we have.") - return True #returning True here to break out of any more ingestions - - else: - logging.warning("X-Rate-Limit header is missing.") - + knowledge_cache = self.load_knowledge_cache() + knowledge_cache[entity] = server_knowledge + try: + with open(self.knowledge_file, 'w') as f: + json.dump(knowledge_cache, f, indent=4) + except Exception as e: + logging.error(f"Failed to update knowledge cache for {entity} in {self.knowledge_file}") + raise e + def handle_response(self, response) -> bool: if response.status_code == 400: logging.error("Bad request. The request could not be understood by the API due to malformed syntax or validation errors.") @@ -100,14 +88,14 @@ class Ingest: logging.error("Forbidden. Access is denied.") sys.exit(ec.FORBIDDEN) elif response.status_code == 404: - logging.error("Not found. The specified URI does not exist.") + logging.error("Not found. The specified URL does not exist.") sys.exit(ec.NOT_FOUND) elif response.status_code == 409: logging.error("Conflict. The resource cannot be saved due to a conflict.") sys.exit(ec.CONFLICT) elif response.status_code == 429: logging.error("Too many requests. You have made too many requests in a short amount of time.") - return True + return True elif response.status_code == 500: logging.error("Internal server error. The API experienced an unexpected error.") return True @@ -118,7 +106,7 @@ class Ingest: response.raise_for_status() return False - def fetch_and_cache_entity_data(self): + def start_ingestion(self): """ Fetch and cache data for all entities. """ @@ -128,11 +116,13 @@ class Ingest: logging.warning(f"Raw data exists for {entity} processing any raw data we already have.") break # break here instead of continue as we dont want to update our server knowledge cache and potentially miss data. - last_knowledge = self.knowledge_cache.get(entity, 0) + knowledge_cache = self.load_knowledge_cache() + last_knowledge = knowledge_cache.get(entity, 0) #logging.debug(f'Last Knowledge of {entity}: {last_knowledge}') + logging.info(f'Fetching {entity} data since last knowledge: {last_knowledge}') url = f'{self.base_url}/{self.budget_id}/{entity}?last_knowledge_of_server={last_knowledge}' - + response = None for attempt in range(self.MAX_RETRIES): try: response = requests.get(url, headers=self.headers) @@ -146,11 +136,12 @@ class Ingest: else: logging.error("Max retries reached. Exiting.") sys.exit(ec.REQUESTS_ERROR) - + data = response.json() + logging.debug(f'response data: {data}') server_knowledge = data['data'].get('server_knowledge') logging.debug(f'{entity} new server knowledge: {server_knowledge}') - + if server_knowledge is not None and server_knowledge != last_knowledge: self.update_server_knowledge_cache(entity, server_knowledge) entity_data = data['data'] @@ -158,6 +149,3 @@ class Ingest: self.save_entity_data_to_raw(entity, entity_data) else: logging.info(f"No new data for {entity}. Skipping cache update.") - - if self.check_rate_limit(response): - break # break out here and continue processing the data we have. diff --git a/pipeline/pipeline_main.py b/pipeline/pipeline_main.py index 05d3a9b..a2e5045 100644 --- a/pipeline/pipeline_main.py +++ b/pipeline/pipeline_main.py @@ -12,7 +12,8 @@ def pipeline_main(config): '''Run the data pipeline''' logging.info('Starting data pipeline') - Ingest(config) + ingest = Ingest(config) + ingest.start_ingestion() RawToBase(config) DimAccounts(config) DimCategories(config) diff --git a/requirements.txt b/requirements.txt index 1642cd8..f5094d3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,6 @@ pyyaml dash pandas pyarrow -dash-bootstrap-components \ No newline at end of file +dash-bootstrap-components +# testing requirements below +pytest \ No newline at end of file diff --git a/tests/test_ingest.py b/tests/test_ingest.py new file mode 100644 index 0000000..e51e86f --- /dev/null +++ b/tests/test_ingest.py @@ -0,0 +1,307 @@ +import pytest +from unittest.mock import patch, mock_open, MagicMock +import json +import os +from typing import Dict, Any +import logging + +from pipeline.ingest import Ingest +import config.exit_codes as ec + +# Mock configuration for initializing the Ingest class +mock_config = { + 'API_TOKEN': 'test_token', + 'BUDGET_ID': 'test_budget_id', + 'base_url': 'http://test_base_url', + 'knowledge_file': 'data/test_knowledge_file.json', + 'entities': ['entity1', 'entity2'], + 'raw_data_path': 'test_raw_data_path', + 'REQUESTS_MAX_RETRIES': 3, + 'REQUESTS_RETRY_DELAY': 1 +} + +# Test for load_knowledge_cache method +def test_load_knowledge_cache_file_exists(): + mock_data = {"key": "value"} + with patch('os.path.exists', return_value=True), \ + patch('builtins.open', mock_open(read_data=json.dumps(mock_data))) as mock_file: + + ingest_instance = Ingest(mock_config) + result = ingest_instance.load_knowledge_cache() + + mock_file.assert_called_once_with(mock_config['knowledge_file'], 'r') + assert result == mock_data + +def test_load_knowledge_cache_file_not_exists(): + with patch('os.path.exists', return_value=False): + + ingest_instance = Ingest(mock_config) + result = ingest_instance.load_knowledge_cache() + + assert result == {} + +# Test for save_entity_data_to_raw method +def test_save_entity_data_to_raw_success(): + entity = 'entity1' + data = {"key": "value"} + current_time = '20230101123000' + directory = os.path.join(mock_config['raw_data_path'], entity) + entity_file = f'{directory}/{current_time}.json' + + with patch('os.path.exists', return_value=False), \ + patch('os.makedirs') as mock_makedirs, \ + patch('builtins.open', mock_open()) as mock_file, \ + patch('time.strftime', return_value=current_time), \ + patch('logging.info') as mock_logging_info: + + ingest_instance = Ingest(mock_config) + ingest_instance.save_entity_data_to_raw(entity, data) + + mock_makedirs.assert_called_once_with(directory) + mock_file.assert_called_once_with(entity_file, 'w') + + # Get the file handle and check the written content + handle = mock_file() + handle.write.assert_called() + written_content = ''.join(call.args[0] for call in handle.write.call_args_list) + assert written_content == json.dumps(data, indent=4) + + mock_logging_info.assert_called_once_with(f"Saving {entity} data to {entity_file}") + +def test_save_entity_data_to_raw_existing_directory(): + entity = 'entity1' + data = {"key": "value"} + current_time = '20230101123000' + directory = os.path.join(mock_config['raw_data_path'], entity) + entity_file = f'{directory}/{current_time}.json' + + with patch('os.path.exists', return_value=True), \ + patch('os.makedirs') as mock_makedirs, \ + patch('builtins.open', mock_open()) as mock_file, \ + patch('time.strftime', return_value=current_time), \ + patch('logging.info') as mock_logging_info: + + ingest_instance = Ingest(mock_config) + ingest_instance.save_entity_data_to_raw(entity, data) + + mock_makedirs.assert_not_called() + mock_file.assert_called_once_with(entity_file, 'w') + + # Get the file handle and check the written content + handle = mock_file() + handle.write.assert_called() + written_content = ''.join(call.args[0] for call in handle.write.call_args_list) + assert written_content == json.dumps(data, indent=4) + + mock_logging_info.assert_called_once_with(f"Saving {entity} data to {entity_file}") + +def test_save_entity_data_to_raw_error(): + entity = 'entity1' + data = {"key": "value"} + current_time = '20230101123000' + directory = os.path.join(mock_config['raw_data_path'], entity) + entity_file = f'{directory}/{current_time}.json' + + with patch('os.path.exists', return_value=True), \ + patch('builtins.open', mock_open()) as mock_file, \ + patch('time.strftime', return_value=current_time), \ + patch('logging.info') as mock_logging_info, \ + patch('logging.error') as mock_logging_error: + + mock_file.side_effect = Exception("Test error") + + ingest_instance = Ingest(mock_config) + + with pytest.raises(Exception, match="Test error"): + ingest_instance.save_entity_data_to_raw(entity, data) + + mock_logging_error.assert_called_once_with(f"Failed to save data for {entity} to {entity_file}") + +def test_update_server_knowledge_cache_file_exists(): + entity = 'entity1' + server_knowledge = {"key": "value"} + existing_cache = {"entity2": {"key": "old_value"}} + updated_cache = {"entity2": {"key": "old_value"}, "entity1": {"key": "value"}} + + with patch('builtins.open', mock_open(read_data=json.dumps(existing_cache))) as mock_file, \ + patch('os.path.exists', return_value=True), \ + patch('logging.error') as mock_logging_error: + + ingest_instance = Ingest(mock_config) + ingest_instance.update_server_knowledge_cache(entity, server_knowledge) + + mock_file.assert_called_with(mock_config['knowledge_file'], 'w') + handle = mock_file() + handle.write.assert_called() + written_content = ''.join(call.args[0] for call in handle.write.call_args_list) + assert json.loads(written_content) == updated_cache + mock_logging_error.assert_not_called() + +def test_update_server_knowledge_cache_file_not_exists(): + entity = 'entity1' + server_knowledge = {"key": "value"} + updated_cache = {"entity1": {"key": "value"}} + + with patch('builtins.open', mock_open()) as mock_file, \ + patch('os.path.exists', return_value=False), \ + patch('os.makedirs') as mock_makedirs, \ + patch('logging.info') as mock_logging_info, \ + patch('logging.error') as mock_logging_error: + + # Ensure the side_effect list has enough elements to cover all calls to open + mock_file.side_effect = [FileNotFoundError(), mock_open().return_value] + + ingest_instance = Ingest(mock_config) + + with pytest.raises(FileNotFoundError): + ingest_instance.update_server_knowledge_cache(entity, server_knowledge) + + mock_makedirs.assert_called_once_with(os.path.dirname(mock_config['knowledge_file']), exist_ok=True) + mock_file.assert_called_with(mock_config['knowledge_file'], 'w') + mock_logging_error.assert_called_once_with(f"Failed to update knowledge cache for {entity} in {mock_config['knowledge_file']}") + +def test_update_server_knowledge_cache_write_error(): + entity = 'entity1' + server_knowledge = {"key": "value"} + + with patch('builtins.open', mock_open()) as mock_file, \ + patch('logging.error') as mock_logging_error: + + mock_file.side_effect = Exception("Test error") + + ingest_instance = Ingest(mock_config) + + with pytest.raises(Exception, match="Test error"): + ingest_instance.update_server_knowledge_cache(entity, server_knowledge) + + mock_logging_error.assert_called_once_with(f"Failed to update knowledge cache for {entity} in {mock_config['knowledge_file']}") + +def test_check_rate_limit_above_threshold(): + response = MagicMock() + response.headers = {'X-Rate-Limit': '10/100'} + + ingest_instance = Ingest(mock_config) + result = ingest_instance.check_rate_limit(response) + + assert result is None + +def test_check_rate_limit_below_threshold(): + response = MagicMock() + response.headers = {'X-Rate-Limit': '90/100'} + + ingest_instance = Ingest(mock_config) + result = ingest_instance.check_rate_limit(response) + + assert result is None + +def test_check_rate_limit_exceeded(): + response = MagicMock() + response.headers = {'X-Rate-Limit': '100/100'} + + ingest_instance = Ingest(mock_config) + result = ingest_instance.check_rate_limit(response) + + assert result is True + +def test_check_rate_limit_header_missing(): + response = MagicMock() + response.headers = {} + + ingest_instance = Ingest(mock_config) + result = ingest_instance.check_rate_limit(response) + + assert result is None + +def test_handle_response_bad_request(): + response = MagicMock() + response.status_code = 400 + + ingest_instance = Ingest(mock_config) + + with pytest.raises(SystemExit) as e: + ingest_instance.handle_response(response) + assert e.type == SystemExit + assert e.value.code == ec.BAD_REQUEST + +def test_handle_response_unauthorized(): + response = MagicMock() + response.status_code = 401 + + ingest_instance = Ingest(mock_config) + + with pytest.raises(SystemExit) as e: + ingest_instance.handle_response(response) + assert e.type == SystemExit + assert e.value.code == ec.UNAUTHORIZED_API_TOKEN + +def test_handle_response_forbidden(): + response = MagicMock() + response.status_code = 403 + + ingest_instance = Ingest(mock_config) + + with pytest.raises(SystemExit) as e: + ingest_instance.handle_response(response) + assert e.type == SystemExit + assert e.value.code == ec.FORBIDDEN + +def test_handle_response_not_found(): + response = MagicMock() + response.status_code = 404 + + ingest_instance = Ingest(mock_config) + + with pytest.raises(SystemExit) as e: + ingest_instance.handle_response(response) + assert e.type == SystemExit + assert e.value.code == ec.NOT_FOUND + +def test_handle_response_conflict(): + response = MagicMock() + response.status_code = 409 + + ingest_instance = Ingest(mock_config) + + with pytest.raises(SystemExit) as e: + ingest_instance.handle_response(response) + assert e.type == SystemExit + assert e.value.code == ec.CONFLICT + +def test_handle_response_too_many_requests(): + response = MagicMock() + response.status_code = 429 + + ingest_instance = Ingest(mock_config) + + result = ingest_instance.handle_response(response) + assert result is True + +def test_handle_response_internal_server_error(): + response = MagicMock() + response.status_code = 500 + + ingest_instance = Ingest(mock_config) + + result = ingest_instance.handle_response(response) + assert result is True + +def test_handle_response_service_unavailable(): + response = MagicMock() + response.status_code = 503 + + ingest_instance = Ingest(mock_config) + + result = ingest_instance.handle_response(response) + assert result is True + +def test_handle_response_ok(): + response = MagicMock() + response.status_code = 200 + + ingest_instance = Ingest(mock_config) + + result = ingest_instance.handle_response(response) + assert result is False + +if __name__ == "__main__": + pytest.main() \ No newline at end of file diff --git a/tests/test_main.py b/tests/test_main.py new file mode 100644 index 0000000..8842ada --- /dev/null +++ b/tests/test_main.py @@ -0,0 +1,71 @@ +import pytest +from unittest.mock import patch, mock_open, MagicMock +import yaml +import logging +import atexit +import sys + +from main import set_up_logging, load_config +import config.exit_codes as ec + +# Test for set_up_logging function +def test_set_up_logging_success(): + with patch('builtins.open', mock_open(read_data="handlers:\n queue_handler:\n class: logging.handlers.QueueHandler")), \ + patch('yaml.safe_load', return_value={"handlers": {"queue_handler": {"class": "logging.handlers.QueueHandler"}}}), \ + patch('logging.config.dictConfig') as mock_dict_config, \ + patch('logging.getHandlerByName', return_value=MagicMock(listener=MagicMock(start=MagicMock(), stop=MagicMock()))), \ + patch('atexit.register') as mock_atexit_register: + + set_up_logging() + + mock_dict_config.assert_called_once_with({"handlers": {"queue_handler": {"class": "logging.handlers.QueueHandler"}}}) + mock_atexit_register.assert_called_once() + +def test_set_up_logging_yaml_error(): + with patch('builtins.open', mock_open(read_data="invalid_yaml")), \ + patch('yaml.safe_load', side_effect=yaml.YAMLError("Error")), \ + patch('logging.basicConfig') as mock_basic_config: + + set_up_logging() + + mock_basic_config.assert_called_once_with(level=logging.INFO) + +def test_set_up_logging_no_queue_handler(): + with patch('builtins.open', mock_open(read_data="handlers:\n queue_handler:\n class: logging.handlers.QueueHandler")), \ + patch('yaml.safe_load', return_value={"handlers": {"queue_handler": {"class": "logging.handlers.QueueHandler"}}}), \ + patch('logging.config.dictConfig') as mock_dict_config, \ + patch('logging.getHandlerByName', return_value=None): + + set_up_logging() + + mock_dict_config.assert_called_once_with({"handlers": {"queue_handler": {"class": "logging.handlers.QueueHandler"}}}) + +# Test for load_config function +def test_load_config_success(): + with patch('builtins.open', mock_open(read_data="key: value")), \ + patch('yaml.safe_load', return_value={"key": "value"}): + + config = load_config() + + assert config == {"key": "value"} + +def test_load_config_file_not_found(): + with patch('builtins.open', side_effect=FileNotFoundError), \ + patch('logging.error') as mock_logging_error, \ + patch('sys.exit') as mock_sys_exit: + + load_config() + + mock_logging_error.assert_called_once_with('config.yaml file not found') + mock_sys_exit.assert_called_once_with(ec.MISSING_CONFIG_FILE) + +def test_load_config_yaml_error(): + with patch('builtins.open', mock_open(read_data="invalid_yaml")), \ + patch('yaml.safe_load', side_effect=yaml.YAMLError("Error")), \ + patch('logging.error') as mock_logging_error, \ + patch('sys.exit') as mock_sys_exit: + + load_config() + + mock_logging_error.assert_called_once() + mock_sys_exit.assert_called_once_with(ec.CORRUPTED_CONFIG_FILE) \ No newline at end of file