Files
data_pipeline_for_YNAB/tests/test_ingest.py
T
2025-05-30 11:43:47 +01:00

305 lines
10 KiB
Python

import pytest
from unittest.mock import patch, mock_open, MagicMock
import json
import os
from pipeline.ingest import Ingest
import config.exit_codes as ec
# Mock configuration for initializing the Ingest class
mock_config = {
'API_TOKEN': 'test_token',
'BUDGET_ID': 'test_budget_id',
'base_url': 'http://test_base_url',
'knowledge_file': 'data/test_knowledge_file.json',
'entities': ['entity1', 'entity2'],
'raw_data_path': 'test_raw_data_path',
'REQUESTS_MAX_RETRIES': 3,
'REQUESTS_RETRY_DELAY': 1
}
# Test for load_knowledge_cache method
def test_load_knowledge_cache_file_exists():
mock_data = {"key": "value"}
with patch('os.path.exists', return_value=True), \
patch('builtins.open', mock_open(read_data=json.dumps(mock_data))) as mock_file:
ingest_instance = Ingest(mock_config)
result = ingest_instance.load_knowledge_cache()
mock_file.assert_called_once_with(mock_config['knowledge_file'], 'r')
assert result == mock_data
def test_load_knowledge_cache_file_not_exists():
with patch('os.path.exists', return_value=False):
ingest_instance = Ingest(mock_config)
result = ingest_instance.load_knowledge_cache()
assert result == {}
# Test for save_entity_data_to_raw method
def test_save_entity_data_to_raw_success():
entity = 'entity1'
data = {"key": "value"}
current_time = '20230101123000'
directory = os.path.join(mock_config['raw_data_path'], entity)
entity_file = f'{directory}/{current_time}.json'
with patch('os.path.exists', return_value=False), \
patch('os.makedirs') as mock_makedirs, \
patch('builtins.open', mock_open()) as mock_file, \
patch('time.strftime', return_value=current_time), \
patch('logging.info') as mock_logging_info:
ingest_instance = Ingest(mock_config)
ingest_instance.save_entity_data_to_raw(entity, data)
mock_makedirs.assert_called_once_with(directory)
mock_file.assert_called_once_with(entity_file, 'w')
# Get the file handle and check the written content
handle = mock_file()
handle.write.assert_called()
written_content = ''.join(call.args[0] for call in handle.write.call_args_list)
assert written_content == json.dumps(data, indent=4)
mock_logging_info.assert_called_once_with(f"Saving {entity} data to {entity_file}")
def test_save_entity_data_to_raw_existing_directory():
entity = 'entity1'
data = {"key": "value"}
current_time = '20230101123000'
directory = os.path.join(mock_config['raw_data_path'], entity)
entity_file = f'{directory}/{current_time}.json'
with patch('os.path.exists', return_value=True), \
patch('os.makedirs') as mock_makedirs, \
patch('builtins.open', mock_open()) as mock_file, \
patch('time.strftime', return_value=current_time), \
patch('logging.info') as mock_logging_info:
ingest_instance = Ingest(mock_config)
ingest_instance.save_entity_data_to_raw(entity, data)
mock_makedirs.assert_not_called()
mock_file.assert_called_once_with(entity_file, 'w')
# Get the file handle and check the written content
handle = mock_file()
handle.write.assert_called()
written_content = ''.join(call.args[0] for call in handle.write.call_args_list)
assert written_content == json.dumps(data, indent=4)
mock_logging_info.assert_called_once_with(f"Saving {entity} data to {entity_file}")
def test_save_entity_data_to_raw_error():
entity = 'entity1'
data = {"key": "value"}
current_time = '20230101123000'
directory = os.path.join(mock_config['raw_data_path'], entity)
entity_file = f'{directory}/{current_time}.json'
with patch('os.path.exists', return_value=True), \
patch('builtins.open', mock_open()) as mock_file, \
patch('time.strftime', return_value=current_time), \
patch('logging.info') as mock_logging_info, \
patch('logging.error') as mock_logging_error:
mock_file.side_effect = Exception("Test error")
ingest_instance = Ingest(mock_config)
with pytest.raises(Exception, match="Test error"):
ingest_instance.save_entity_data_to_raw(entity, data)
mock_logging_error.assert_called_once_with(f"Failed to save data for {entity} to {entity_file}")
def test_update_server_knowledge_cache_file_exists():
entity = 'entity1'
server_knowledge = {"key": "value"}
existing_cache = {"entity2": {"key": "old_value"}}
updated_cache = {"entity2": {"key": "old_value"}, "entity1": {"key": "value"}}
with patch('builtins.open', mock_open(read_data=json.dumps(existing_cache))) as mock_file, \
patch('os.path.exists', return_value=True), \
patch('logging.error') as mock_logging_error:
ingest_instance = Ingest(mock_config)
ingest_instance.update_server_knowledge_cache(entity, server_knowledge)
mock_file.assert_called_with(mock_config['knowledge_file'], 'w')
handle = mock_file()
handle.write.assert_called()
written_content = ''.join(call.args[0] for call in handle.write.call_args_list)
assert json.loads(written_content) == updated_cache
mock_logging_error.assert_not_called()
def test_update_server_knowledge_cache_file_not_exists():
entity = 'entity1'
server_knowledge = {"key": "value"}
updated_cache = {"entity1": {"key": "value"}}
with patch('builtins.open', mock_open()) as mock_file, \
patch('os.path.exists', return_value=False), \
patch('os.makedirs') as mock_makedirs, \
patch('logging.info') as mock_logging_info, \
patch('logging.error') as mock_logging_error:
# Ensure the side_effect list has enough elements to cover all calls to open
mock_file.side_effect = [FileNotFoundError(), mock_open().return_value]
ingest_instance = Ingest(mock_config)
with pytest.raises(FileNotFoundError):
ingest_instance.update_server_knowledge_cache(entity, server_knowledge)
mock_makedirs.assert_called_once_with(os.path.dirname(mock_config['knowledge_file']), exist_ok=True)
mock_file.assert_called_with(mock_config['knowledge_file'], 'w')
mock_logging_error.assert_called_once_with(f"Failed to update knowledge cache for {entity} in {mock_config['knowledge_file']}")
def test_update_server_knowledge_cache_write_error():
entity = 'entity1'
server_knowledge = {"key": "value"}
with patch('builtins.open', mock_open()) as mock_file, \
patch('logging.error') as mock_logging_error:
mock_file.side_effect = Exception("Test error")
ingest_instance = Ingest(mock_config)
with pytest.raises(Exception, match="Test error"):
ingest_instance.update_server_knowledge_cache(entity, server_knowledge)
mock_logging_error.assert_called_once_with(f"Failed to update knowledge cache for {entity} in {mock_config['knowledge_file']}")
def test_check_rate_limit_above_threshold():
response = MagicMock()
response.headers = {'X-Rate-Limit': '10/100'}
ingest_instance = Ingest(mock_config)
result = ingest_instance.check_rate_limit(response)
assert result is None
def test_check_rate_limit_below_threshold():
response = MagicMock()
response.headers = {'X-Rate-Limit': '90/100'}
ingest_instance = Ingest(mock_config)
result = ingest_instance.check_rate_limit(response)
assert result is None
def test_check_rate_limit_exceeded():
response = MagicMock()
response.headers = {'X-Rate-Limit': '100/100'}
ingest_instance = Ingest(mock_config)
result = ingest_instance.check_rate_limit(response)
assert result is True
def test_check_rate_limit_header_missing():
response = MagicMock()
response.headers = {}
ingest_instance = Ingest(mock_config)
result = ingest_instance.check_rate_limit(response)
assert result is None
def test_handle_response_bad_request():
response = MagicMock()
response.status_code = 400
ingest_instance = Ingest(mock_config)
with pytest.raises(SystemExit) as e:
ingest_instance.handle_response(response)
assert e.type == SystemExit
assert e.value.code == ec.BAD_REQUEST
def test_handle_response_unauthorized():
response = MagicMock()
response.status_code = 401
ingest_instance = Ingest(mock_config)
with pytest.raises(SystemExit) as e:
ingest_instance.handle_response(response)
assert e.type == SystemExit
assert e.value.code == ec.UNAUTHORIZED_API_TOKEN
def test_handle_response_forbidden():
response = MagicMock()
response.status_code = 403
ingest_instance = Ingest(mock_config)
with pytest.raises(SystemExit) as e:
ingest_instance.handle_response(response)
assert e.type == SystemExit
assert e.value.code == ec.FORBIDDEN
def test_handle_response_not_found():
response = MagicMock()
response.status_code = 404
ingest_instance = Ingest(mock_config)
with pytest.raises(SystemExit) as e:
ingest_instance.handle_response(response)
assert e.type == SystemExit
assert e.value.code == ec.NOT_FOUND
def test_handle_response_conflict():
response = MagicMock()
response.status_code = 409
ingest_instance = Ingest(mock_config)
with pytest.raises(SystemExit) as e:
ingest_instance.handle_response(response)
assert e.type == SystemExit
assert e.value.code == ec.CONFLICT
def test_handle_response_too_many_requests():
response = MagicMock()
response.status_code = 429
ingest_instance = Ingest(mock_config)
result = ingest_instance.handle_response(response)
assert result is True
def test_handle_response_internal_server_error():
response = MagicMock()
response.status_code = 500
ingest_instance = Ingest(mock_config)
result = ingest_instance.handle_response(response)
assert result is True
def test_handle_response_service_unavailable():
response = MagicMock()
response.status_code = 503
ingest_instance = Ingest(mock_config)
result = ingest_instance.handle_response(response)
assert result is True
def test_handle_response_ok():
response = MagicMock()
response.status_code = 200
ingest_instance = Ingest(mock_config)
result = ingest_instance.handle_response(response)
assert result is False
if __name__ == "__main__":
pytest.main()