first pass at base layer

This commit is contained in:
Jake Pullen
2024-07-29 15:12:26 +01:00
parent 23458af133
commit 6bc992e011
4 changed files with 77 additions and 75 deletions
+74
View File
@@ -0,0 +1,74 @@
import pandas
import json
import os
import logging
from datetime import datetime
from typing import List
class RawToBase:
def __init__(self, entities: List[str], raw_data_path: str, base_data_path: str):
self.entities = entities
self.raw_data_path = raw_data_path
self.base_data_path = base_data_path
self.data = {}
self.base_data = {}
logging.basicConfig(level=logging.DEBUG)
self._load_raw_data()
self._load_existing_base_data()
self._combine_data()
self._resolve_duplicates()
self._save_base_data()
def _load_raw_data(self):
for entity in self.entities:
entity_path = os.path.join(self.raw_data_path, entity)
self.data[entity] = []
logging.debug(f"Loading data for entity: {entity} from path: {entity_path}")
for file_name in os.listdir(entity_path):
if file_name.endswith('.json'):
file_path = os.path.join(entity_path, file_name)
logging.debug(f"Reading file: {file_path}")
try:
with open(file_path, 'r') as f:
data = json.load(f)
for record in data:
record['ingestion_date'] = datetime.strptime(file_name.split('.')[0], '%Y%m%d').date()
self.data[entity].append(data)
logging.debug(f"Successfully loaded data from file: {file_path}")
except Exception as e:
logging.error(f"Failed to load data from file: {file_path}, error: {e}")
def _load_existing_base_data(self):
for entity in self.entities:
base_path = os.path.join(self.base_data_path, 'base', entity, f'{entity}.parquet')
if os.path.exists(base_path):
logging.debug(f"Loading existing base data for entity: {entity} from path: {base_path}")
self.base_data[entity] = pandas.read_parquet(base_path)
logging.debug(f"Successfully loaded existing base data for entity: {entity}")
else:
self.base_data[entity] = pandas.DataFrame()
logging.debug(f"No existing base data found for entity: {entity}, starting with an empty DataFrame")
def _combine_data(self):
for entity in self.entities:
logging.debug(f"Combining data for entity: {entity}")
combined_data = []
for data in self.data[entity]:
combined_data.extend(data)
new_data_df = pandas.DataFrame(combined_data)
self.base_data[entity] = pandas.concat([self.base_data[entity], new_data_df], ignore_index=True)
logging.debug(f"Successfully combined data for entity: {entity}")
def _resolve_duplicates(self):
for entity in self.entities:
logging.debug(f"Resolving duplicates for entity: {entity}")
self.base_data[entity] = self.base_data[entity].sort_values('ingestion_date', ascending=False).drop_duplicates('id', keep='first')
logging.debug(f"Successfully resolved duplicates for entity: {entity}")
def _save_base_data(self):
for entity in self.entities:
base_path = os.path.join(self.base_data_path, 'base', entity)
os.makedirs(base_path, exist_ok=True)
file_path = os.path.join(base_path, f'{entity}.parquet')
self.base_data[entity].to_parquet(file_path)
logging.debug(f"Saved base data for entity: {entity} to path: {file_path}")