import pytest from unittest.mock import patch, mock_open, MagicMock import json import os from pipeline.ingest import Ingest import config.exit_codes as ec # Mock configuration for initializing the Ingest class mock_config = { "API_TOKEN": "test_token", "BUDGET_ID": "test_budget_id", "base_url": "http://test_base_url", "knowledge_file": "data/test_knowledge_file.json", "entities": ["entity1", "entity2"], "raw_data_path": "test_raw_data_path", "REQUESTS_MAX_RETRIES": 3, "REQUESTS_RETRY_DELAY": 1, } # Test for load_knowledge_cache method def test_load_knowledge_cache_file_exists(): mock_data = {"key": "value"} with ( patch("os.path.exists", return_value=True), patch("builtins.open", mock_open(read_data=json.dumps(mock_data))) as mock_file, ): ingest_instance = Ingest(mock_config) result = ingest_instance.load_knowledge_cache() mock_file.assert_called_once_with(mock_config["knowledge_file"], "r") assert result == mock_data def test_load_knowledge_cache_file_not_exists(): with patch("os.path.exists", return_value=False): ingest_instance = Ingest(mock_config) result = ingest_instance.load_knowledge_cache() assert result == {} # Test for save_entity_data_to_raw method def test_save_entity_data_to_raw_success(): entity = "entity1" data = {"key": "value"} current_time = "20230101123000" directory = os.path.join(mock_config["raw_data_path"], entity) entity_file = f"{directory}/{current_time}.json" with ( patch("os.path.exists", return_value=False), patch("os.makedirs") as mock_makedirs, patch("builtins.open", mock_open()) as mock_file, patch("time.strftime", return_value=current_time), patch("logging.info") as mock_logging_info, ): ingest_instance = Ingest(mock_config) ingest_instance.save_entity_data_to_raw(entity, data) mock_makedirs.assert_called_once_with(directory) mock_file.assert_called_once_with(entity_file, "w") # Get the file handle and check the written content handle = mock_file() handle.write.assert_called() written_content = "".join(call.args[0] for call in handle.write.call_args_list) assert written_content == json.dumps(data, indent=4) mock_logging_info.assert_called_once_with( f"Saving {entity} data to {entity_file}" ) def test_save_entity_data_to_raw_existing_directory(): entity = "entity1" data = {"key": "value"} current_time = "20230101123000" directory = os.path.join(mock_config["raw_data_path"], entity) entity_file = f"{directory}/{current_time}.json" with ( patch("os.path.exists", return_value=True), patch("os.makedirs") as mock_makedirs, patch("builtins.open", mock_open()) as mock_file, patch("time.strftime", return_value=current_time), patch("logging.info") as mock_logging_info, ): ingest_instance = Ingest(mock_config) ingest_instance.save_entity_data_to_raw(entity, data) mock_makedirs.assert_not_called() mock_file.assert_called_once_with(entity_file, "w") # Get the file handle and check the written content handle = mock_file() handle.write.assert_called() written_content = "".join(call.args[0] for call in handle.write.call_args_list) assert written_content == json.dumps(data, indent=4) mock_logging_info.assert_called_once_with( f"Saving {entity} data to {entity_file}" ) def test_save_entity_data_to_raw_error(): entity = "entity1" data = {"key": "value"} current_time = "20230101123000" directory = os.path.join(mock_config["raw_data_path"], entity) entity_file = f"{directory}/{current_time}.json" with ( patch("os.path.exists", return_value=True), patch("builtins.open", mock_open()) as mock_file, patch("time.strftime", return_value=current_time), patch("logging.info") as mock_logging_info, patch("logging.error") as mock_logging_error, ): mock_file.side_effect = Exception("Test error") ingest_instance = Ingest(mock_config) with pytest.raises(Exception, match="Test error"): ingest_instance.save_entity_data_to_raw(entity, data) mock_logging_error.assert_called_once_with( f"Failed to save data for {entity} to {entity_file}" ) def test_update_server_knowledge_cache_file_exists(): entity = "entity1" server_knowledge = {"key": "value"} existing_cache = {"entity2": {"key": "old_value"}} updated_cache = {"entity2": {"key": "old_value"}, "entity1": {"key": "value"}} with ( patch( "builtins.open", mock_open(read_data=json.dumps(existing_cache)) ) as mock_file, patch("os.path.exists", return_value=True), patch("logging.error") as mock_logging_error, ): ingest_instance = Ingest(mock_config) ingest_instance.update_server_knowledge_cache(entity, server_knowledge) mock_file.assert_called_with(mock_config["knowledge_file"], "w") handle = mock_file() handle.write.assert_called() written_content = "".join(call.args[0] for call in handle.write.call_args_list) assert json.loads(written_content) == updated_cache mock_logging_error.assert_not_called() def test_update_server_knowledge_cache_file_not_exists(): entity = "entity1" server_knowledge = {"key": "value"} updated_cache = {"entity1": {"key": "value"}} with ( patch("builtins.open", mock_open()) as mock_file, patch("os.path.exists", return_value=False), patch("os.makedirs") as mock_makedirs, patch("logging.info") as mock_logging_info, patch("logging.error") as mock_logging_error, ): # Ensure the side_effect list has enough elements to cover all calls to open mock_file.side_effect = [FileNotFoundError(), mock_open().return_value] ingest_instance = Ingest(mock_config) with pytest.raises(FileNotFoundError): ingest_instance.update_server_knowledge_cache(entity, server_knowledge) mock_makedirs.assert_called_once_with( os.path.dirname(mock_config["knowledge_file"]), exist_ok=True ) mock_file.assert_called_with(mock_config["knowledge_file"], "w") mock_logging_error.assert_called_once_with( f"Failed to update knowledge cache for {entity} in { mock_config['knowledge_file'] }" ) def test_update_server_knowledge_cache_write_error(): entity = "entity1" server_knowledge = {"key": "value"} with ( patch("builtins.open", mock_open()) as mock_file, patch("logging.error") as mock_logging_error, ): mock_file.side_effect = Exception("Test error") ingest_instance = Ingest(mock_config) with pytest.raises(Exception, match="Test error"): ingest_instance.update_server_knowledge_cache(entity, server_knowledge) mock_logging_error.assert_called_once_with( f"Failed to update knowledge cache for {entity} in { mock_config['knowledge_file'] }" ) def test_check_rate_limit_above_threshold(): response = MagicMock() response.headers = {"X-Rate-Limit": "10/100"} ingest_instance = Ingest(mock_config) result = ingest_instance.check_rate_limit(response) assert result is None def test_check_rate_limit_below_threshold(): response = MagicMock() response.headers = {"X-Rate-Limit": "90/100"} ingest_instance = Ingest(mock_config) result = ingest_instance.check_rate_limit(response) assert result is None def test_check_rate_limit_exceeded(): response = MagicMock() response.headers = {"X-Rate-Limit": "100/100"} ingest_instance = Ingest(mock_config) result = ingest_instance.check_rate_limit(response) assert result is True def test_check_rate_limit_header_missing(): response = MagicMock() response.headers = {} ingest_instance = Ingest(mock_config) result = ingest_instance.check_rate_limit(response) assert result is None def test_handle_response_bad_request(): response = MagicMock() response.status_code = 400 ingest_instance = Ingest(mock_config) with pytest.raises(SystemExit) as e: ingest_instance.handle_response(response) assert e.type is SystemExit assert e.value.code == ec.BAD_REQUEST def test_handle_response_unauthorized(): response = MagicMock() response.status_code = 401 ingest_instance = Ingest(mock_config) with pytest.raises(SystemExit) as e: ingest_instance.handle_response(response) assert e.type is SystemExit assert e.value.code == ec.UNAUTHORIZED_API_TOKEN def test_handle_response_forbidden(): response = MagicMock() response.status_code = 403 ingest_instance = Ingest(mock_config) with pytest.raises(SystemExit) as e: ingest_instance.handle_response(response) assert e.type is SystemExit assert e.value.code == ec.FORBIDDEN def test_handle_response_not_found(): response = MagicMock() response.status_code = 404 ingest_instance = Ingest(mock_config) with pytest.raises(SystemExit) as e: ingest_instance.handle_response(response) assert e.type is SystemExit assert e.value.code == ec.NOT_FOUND def test_handle_response_conflict(): response = MagicMock() response.status_code = 409 ingest_instance = Ingest(mock_config) with pytest.raises(SystemExit) as e: ingest_instance.handle_response(response) assert e.type is SystemExit assert e.value.code == ec.CONFLICT def test_handle_response_too_many_requests(): response = MagicMock() response.status_code = 429 ingest_instance = Ingest(mock_config) result = ingest_instance.handle_response(response) assert result is True def test_handle_response_internal_server_error(): response = MagicMock() response.status_code = 500 ingest_instance = Ingest(mock_config) result = ingest_instance.handle_response(response) assert result is True def test_handle_response_service_unavailable(): response = MagicMock() response.status_code = 503 ingest_instance = Ingest(mock_config) result = ingest_instance.handle_response(response) assert result is True def test_handle_response_ok(): response = MagicMock() response.status_code = 200 ingest_instance = Ingest(mock_config) result = ingest_instance.handle_response(response) assert result is False if __name__ == "__main__": pytest.main()