diff --git a/ingest.py b/ingest.py index 3c7dd48..e0b4f70 100644 --- a/ingest.py +++ b/ingest.py @@ -33,7 +33,7 @@ class Ingest: Save the data for a specific entity to a new cache file. """ current_time = time.strftime('%Y%m%d%H%M%S') - directory = f'data/{entity}' + directory = f'data/raw/{entity}' if not os.path.exists(directory): os.makedirs(directory) entity_file = f'{directory}/{current_time}.json' diff --git a/main.py b/main.py index eed884a..b973522 100644 --- a/main.py +++ b/main.py @@ -2,6 +2,7 @@ import os import dotenv import logging from ingest import Ingest +from raw_to_base import RawToBase dotenv.load_dotenv() @@ -20,3 +21,4 @@ ingest_info['BUDGET_ID'] = BUDGET_ID Ingest(ingest_info) +RawToBase(entities, 'data/raw', 'data/base') \ No newline at end of file diff --git a/raw_to_base.ipynb b/raw_to_base.ipynb deleted file mode 100644 index 5171401..0000000 --- a/raw_to_base.ipynb +++ /dev/null @@ -1,74 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "ename": "", - "evalue": "", - "output_type": "error", - "traceback": [ - "\u001b[1;31mRunning cells with 'Python 3.12.4' requires the ipykernel package.\n", - "\u001b[1;31mRun the following command to install 'ipykernel' into the Python environment. \n", - "\u001b[1;31mCommand: '/bin/python3.12 -m pip install ipykernel -U --user --force-reinstall'" - ] - } - ], - "source": [ - "from pyspark.sql import SparkSession\n", - "from pyspark.sql.functions import *\n", - "from pyspark.sql.types import *\n", - "\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "ename": "", - "evalue": "", - "output_type": "error", - "traceback": [ - "\u001b[1;31mRunning cells with 'Python 3.12.4' requires the ipykernel package.\n", - "\u001b[1;31mRun the following command to install 'ipykernel' into the Python environment. \n", - "\u001b[1;31mCommand: '/bin/python3.12 -m pip install ipykernel -U --user --force-reinstall'" - ] - } - ], - "source": [ - "\n", - "spark = SparkSession.builder.appName(\"finance_dwh\").config(\"spark.memory.offHeap.enabled\",\"true\").config(\"spark.memory.offHeap.size\",\"10g\").getOrCreate()\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "\n", - "accounts_data = spark.read.json(\"data/20240728094708.json\")\n", - "accounts_data.printSchema()\n", - "#accounts_data.show()\n", - "\n" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "name": "python", - "version": "3.12.4" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/raw_to_base.py b/raw_to_base.py new file mode 100644 index 0000000..d038c17 --- /dev/null +++ b/raw_to_base.py @@ -0,0 +1,74 @@ +import pandas +import json +import os +import logging +from datetime import datetime +from typing import List + +class RawToBase: + def __init__(self, entities: List[str], raw_data_path: str, base_data_path: str): + self.entities = entities + self.raw_data_path = raw_data_path + self.base_data_path = base_data_path + self.data = {} + self.base_data = {} + logging.basicConfig(level=logging.DEBUG) + self._load_raw_data() + self._load_existing_base_data() + self._combine_data() + self._resolve_duplicates() + self._save_base_data() + + def _load_raw_data(self): + for entity in self.entities: + entity_path = os.path.join(self.raw_data_path, entity) + self.data[entity] = [] + logging.debug(f"Loading data for entity: {entity} from path: {entity_path}") + for file_name in os.listdir(entity_path): + if file_name.endswith('.json'): + file_path = os.path.join(entity_path, file_name) + logging.debug(f"Reading file: {file_path}") + try: + with open(file_path, 'r') as f: + data = json.load(f) + for record in data: + record['ingestion_date'] = datetime.strptime(file_name.split('.')[0], '%Y%m%d').date() + self.data[entity].append(data) + logging.debug(f"Successfully loaded data from file: {file_path}") + except Exception as e: + logging.error(f"Failed to load data from file: {file_path}, error: {e}") + + def _load_existing_base_data(self): + for entity in self.entities: + base_path = os.path.join(self.base_data_path, 'base', entity, f'{entity}.parquet') + if os.path.exists(base_path): + logging.debug(f"Loading existing base data for entity: {entity} from path: {base_path}") + self.base_data[entity] = pandas.read_parquet(base_path) + logging.debug(f"Successfully loaded existing base data for entity: {entity}") + else: + self.base_data[entity] = pandas.DataFrame() + logging.debug(f"No existing base data found for entity: {entity}, starting with an empty DataFrame") + + def _combine_data(self): + for entity in self.entities: + logging.debug(f"Combining data for entity: {entity}") + combined_data = [] + for data in self.data[entity]: + combined_data.extend(data) + new_data_df = pandas.DataFrame(combined_data) + self.base_data[entity] = pandas.concat([self.base_data[entity], new_data_df], ignore_index=True) + logging.debug(f"Successfully combined data for entity: {entity}") + + def _resolve_duplicates(self): + for entity in self.entities: + logging.debug(f"Resolving duplicates for entity: {entity}") + self.base_data[entity] = self.base_data[entity].sort_values('ingestion_date', ascending=False).drop_duplicates('id', keep='first') + logging.debug(f"Successfully resolved duplicates for entity: {entity}") + + def _save_base_data(self): + for entity in self.entities: + base_path = os.path.join(self.base_data_path, 'base', entity) + os.makedirs(base_path, exist_ok=True) + file_path = os.path.join(base_path, f'{entity}.parquet') + self.base_data[entity].to_parquet(file_path) + logging.debug(f"Saved base data for entity: {entity} to path: {file_path}") \ No newline at end of file