added better logging and moved config to its own folder
This commit is contained in:
@@ -7,3 +7,4 @@ data/*
|
||||
__pycache__/*
|
||||
*/__pycache__/*
|
||||
*.pbix
|
||||
/logs/*
|
||||
@@ -0,0 +1,41 @@
|
||||
import datetime as dt
|
||||
import json
|
||||
import logging
|
||||
from typing import override
|
||||
|
||||
class custom_json_logger(logging.Formatter):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
format_keys: dict[str,str] | None = None,
|
||||
):
|
||||
super().__init__()
|
||||
self.format_keys = format_keys if format_keys is not None else {}
|
||||
|
||||
@override
|
||||
def format(self, record: logging.LogRecord) -> str:
|
||||
record_dict = self._prepare_log_dict(record)
|
||||
return json.dumps(record_dict, default=str)
|
||||
|
||||
def _prepare_log_dict(self, record: logging.LogRecord) -> dict:
|
||||
always_fields = {
|
||||
"message" : record.getMessage(),
|
||||
"timestamp" : dt.datetime.fromtimestamp(
|
||||
record.created, tz=dt.timezone.utc
|
||||
).isoformat(),
|
||||
}
|
||||
if record.exc_info is not None:
|
||||
always_fields["exc_info"] = self.formatException(record.exc_info)
|
||||
|
||||
if record.stack_info is not None:
|
||||
always_fields["stack_info"] = self.formatStack(record.stack_info)
|
||||
|
||||
message = {
|
||||
key: msg_val
|
||||
if (msg_val := always_fields.pop(val, None)) is not None
|
||||
else getattr(record, val)
|
||||
for key, val in self.format_keys.items()
|
||||
}
|
||||
message.update(always_fields)
|
||||
return message
|
||||
|
||||
@@ -0,0 +1,3 @@
|
||||
SUCCESS = 0
|
||||
MISSING_ENV_VARS = 1
|
||||
MISSING_CONFIG_FILE = 2
|
||||
@@ -0,0 +1,41 @@
|
||||
version: 1
|
||||
disable_existing_loggers: False
|
||||
formatters:
|
||||
simple:
|
||||
format: "%(asctime)s - %(levelname)s - %(module)s - %(funcName)s - %(message)s"
|
||||
datefmt: "%Y-%m-%d %H:%M:%S%z"
|
||||
json:
|
||||
"()": config.custom_json_logger.custom_json_logger
|
||||
format_keys:
|
||||
level: levelname
|
||||
timestamp: timestamp
|
||||
logger: name
|
||||
module: module
|
||||
function: funcName
|
||||
line: lineno
|
||||
message: message
|
||||
thread_name: threadName
|
||||
handlers:
|
||||
stderr:
|
||||
class: logging.StreamHandler
|
||||
level: INFO
|
||||
formatter: simple
|
||||
stream: ext://sys.stdout
|
||||
file:
|
||||
class: logging.handlers.RotatingFileHandler
|
||||
level: DEBUG
|
||||
formatter: json
|
||||
filename: logs/dpfy_log.jsonl
|
||||
maxBytes: 10485760 # 10MB
|
||||
backupCount: 10
|
||||
queue_handler:
|
||||
class: logging.handlers.QueueHandler
|
||||
handlers:
|
||||
- stderr
|
||||
- file
|
||||
respect_handler_level: True
|
||||
loggers:
|
||||
root:
|
||||
level: DEBUG
|
||||
handlers:
|
||||
- queue_handler
|
||||
@@ -2,28 +2,49 @@ import os
|
||||
import dotenv
|
||||
import logging
|
||||
import yaml
|
||||
import sys
|
||||
import atexit
|
||||
import logging.config
|
||||
import logging.handlers
|
||||
|
||||
import config.exit_codes as ec
|
||||
from pipeline.ingest import Ingest
|
||||
from pipeline.raw_to_base import RawToBase
|
||||
from pipeline.dimensions import DimAccounts, DimCategories, DimPayees, DimDate
|
||||
from pipeline.facts import FactTransactions, FactScheduledTransactions
|
||||
|
||||
def set_up_logging():
|
||||
with open('config/logging_config.yaml', 'r') as f:
|
||||
try:
|
||||
log_config = yaml.safe_load(f)
|
||||
except yaml.YAMLError as e:
|
||||
print(e)
|
||||
logging.config.dictConfig(log_config)
|
||||
queue_handler = logging.getHandlerByName('queue_handler')
|
||||
if queue_handler is not None:
|
||||
queue_handler.listener.start()
|
||||
atexit.register(queue_handler.listener.stop)
|
||||
|
||||
logger = logging.getLogger("data_pipeline_for_ynab")
|
||||
os.makedirs('logs', exist_ok=True)
|
||||
set_up_logging()
|
||||
# Load environment variables
|
||||
dotenv.load_dotenv()
|
||||
|
||||
API_TOKEN = os.getenv('API_TOKEN')
|
||||
BUDGET_ID = os.getenv('BUDGET_ID')
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
if not API_TOKEN or not BUDGET_ID:
|
||||
logging.error('API_TOKEN or BUDGET_ID is not set in .env file')
|
||||
exit(1)
|
||||
|
||||
with open('config.yaml', 'r') as file:
|
||||
def main():
|
||||
if not API_TOKEN or not BUDGET_ID:
|
||||
logging.error('API_TOKEN or BUDGET_ID is not set in .env file')
|
||||
sys.exit(ec.MISSING_ENV_VARS)
|
||||
|
||||
with open('config/config.yaml', 'r') as file:
|
||||
config = yaml.safe_load(file)
|
||||
|
||||
config['API_TOKEN'] = API_TOKEN
|
||||
config['BUDGET_ID'] = BUDGET_ID
|
||||
config['API_TOKEN'] = API_TOKEN
|
||||
config['BUDGET_ID'] = BUDGET_ID
|
||||
|
||||
if __name__ == '__main__':
|
||||
Ingest(config)
|
||||
RawToBase(config)
|
||||
DimAccounts(config)
|
||||
@@ -32,3 +53,11 @@ if __name__ == '__main__':
|
||||
DimDate(config)
|
||||
FactTransactions(config)
|
||||
FactScheduledTransactions(config)
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
main()
|
||||
except SystemExit as e:
|
||||
exit_code = e.code
|
||||
logging.error(f'Program exited with code {exit_code}')
|
||||
raise
|
||||
Reference in New Issue
Block a user