almost into base nicely

This commit is contained in:
Jake Pullen
2024-08-06 22:38:18 +01:00
parent eb99746bdb
commit 22d7df2224
5 changed files with 110 additions and 63 deletions
+3 -1
View File
@@ -2,4 +2,6 @@
*.csv *.csv
*.log *.log
server_knowledge_cache.json server_knowledge_cache.json
data/* data/*
.venv/*
__pycache__/*
+3 -3
View File
@@ -10,7 +10,7 @@ API_TOKEN = os.getenv('API_TOKEN')
BUDGET_ID = os.getenv('BUDGET_ID') BUDGET_ID = os.getenv('BUDGET_ID')
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
entities = ['accounts', 'categories', 'months', 'payees', 'transactions', 'scheduled_transactions'] entities = ['accounts', 'categories', 'months', 'payees', 'transactions']#, 'scheduled_transactions']
ingest_info = {} ingest_info = {}
ingest_info['entities'] = entities ingest_info['entities'] = entities
@@ -20,5 +20,5 @@ ingest_info['API_TOKEN'] = API_TOKEN
ingest_info['BUDGET_ID'] = BUDGET_ID ingest_info['BUDGET_ID'] = BUDGET_ID
Ingest(ingest_info) #Ingest(ingest_info)
RawToBase(entities, 'data/raw', 'data/base') RawToBase(entities)
+89 -58
View File
@@ -1,74 +1,105 @@
import pandas
import json
import os import os
import json
import logging import logging
from datetime import datetime from datetime import datetime
from typing import List from typing import List, Dict, Any
import polars as pl
class RawToBase: class RawToBase:
def __init__(self, entities: List[str], raw_data_path: str, base_data_path: str): def __init__(self, entities: List[str]):
self.entities = entities self.entities = entities
self.raw_data_path = raw_data_path self.config = {
self.base_data_path = base_data_path 'accounts': {'unique_id': 'accounts_id'},
'categories': {'unique_id': 'categories_id'},
'months': {'unique_id': 'months_month'},
'payees': {'unique_id': 'payees_id'},
'transactions': {'unique_id': 'transactions_id'},
'scheduled_transactions': {'unique_id': 'id'}
}
self.raw_data_path = 'data/raw'
self.base_data_path = 'data/base'
self.data = {} self.data = {}
self.base_data = {} self.base_data = {}
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
self._load_raw_data() self.process_entities()
self._load_existing_base_data()
self._combine_data()
self._resolve_duplicates()
self._save_base_data()
def _load_raw_data(self): def process_entities(self):
for entity in self.entities: for entity in self.entities:
entity_path = os.path.join(self.raw_data_path, entity) self._load_raw_data(entity)
self.data[entity] = [] self._load_existing_base_data(entity)
logging.debug(f"Loading data for entity: {entity} from path: {entity_path}") self._combine_data(entity)
for file_name in os.listdir(entity_path): #self._resolve_duplicates(entity)
if file_name.endswith('.json'): self._save_base_data(entity)
file_path = os.path.join(entity_path, file_name)
logging.debug(f"Reading file: {file_path}")
try:
with open(file_path, 'r') as f:
data = json.load(f)
for record in data:
record['ingestion_date'] = datetime.strptime(file_name.split('.')[0], '%Y%m%d').date()
self.data[entity].append(data)
logging.debug(f"Successfully loaded data from file: {file_path}")
except Exception as e:
logging.error(f"Failed to load data from file: {file_path}, error: {e}")
def _load_existing_base_data(self): def _load_raw_data(self, entity):
for entity in self.entities: entity_path = os.path.join(self.raw_data_path, entity)
base_path = os.path.join(self.base_data_path, 'base', entity, f'{entity}.parquet') self.data[entity] = []
if os.path.exists(base_path): logging.debug(f"Loading data for entity: {entity} from path: {entity_path}")
logging.debug(f"Loading existing base data for entity: {entity} from path: {base_path}")
self.base_data[entity] = pandas.read_parquet(base_path) for file_name in os.listdir(entity_path):
logging.debug(f"Successfully loaded existing base data for entity: {entity}") if file_name.endswith('.json'):
else: file_path = os.path.join(entity_path, file_name)
self.base_data[entity] = pandas.DataFrame() logging.debug(f"Reading file: {file_path}")
logging.debug(f"No existing base data found for entity: {entity}, starting with an empty DataFrame") try:
with open(file_path, 'r') as f:
data = json.load(f)
modified_data = []
for record in data.get(f'{entity}', []):
if isinstance(record, dict):
record['ingestion_date'] = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()
modified_data.append(record)
else:
modified_data.append({'record': record, 'ingestion_date': datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()})
self.data[entity].append(modified_data)
logging.debug(f"Successfully loaded data from file: {file_path}")
except Exception as e:
logging.error(f"Failed to load data from file: {file_path}, error: {e}")
exit(1)
def _combine_data(self): def _load_existing_base_data(self, entity):
for entity in self.entities: base_path = os.path.join(self.base_data_path, 'base', entity, f'{entity}.parquet')
logging.debug(f"Combining data for entity: {entity}") if os.path.exists(base_path):
combined_data = [] logging.debug(f"Loading existing base data for entity: {entity} from path: {base_path}")
self.base_data[entity] = pl.read_parquet(base_path)
logging.debug(f"Successfully loaded existing base data for entity: {entity}")
else:
self.base_data[entity] = pl.DataFrame()
logging.debug(f"No existing base data found for entity: {entity}, starting with an empty DataFrame")
def _combine_data(self, entity):
logging.debug(f"Combining data for entity: {entity}")
combined_data = []
if entity == 'categories':
for data in self.data[entity]:
for group in data:
if 'category_groups' in group:
for category_group in group['category_groups']:
for category in category_group['categories']:
combined_data.append(category)
else:
for data in self.data[entity]: for data in self.data[entity]:
combined_data.extend(data) combined_data.extend(data)
new_data_df = pandas.DataFrame(combined_data)
self.base_data[entity] = pandas.concat([self.base_data[entity], new_data_df], ignore_index=True) new_data_df = pl.DataFrame(combined_data)
logging.debug(f"Successfully combined data for entity: {entity}")
# Ensure the unique id column is preserved
def _resolve_duplicates(self): # unique_id = self.config[entity]['unique_id']
for entity in self.entities: # if unique_id not in new_data_df.columns:
logging.debug(f"Resolving duplicates for entity: {entity}") # logging.error(f"Unique ID column '{unique_id}' not found in the combined data for entity: {entity}")
self.base_data[entity] = self.base_data[entity].sort_values('ingestion_date', ascending=False).drop_duplicates('id', keep='first') # exit(1)
logging.debug(f"Successfully resolved duplicates for entity: {entity}")
self.base_data[entity] = new_data_df
logging.debug(f"Successfully combined data for entity: {entity}")
def _save_base_data(self): def _resolve_duplicates(self, entity):
for entity in self.entities: logging.debug(f"Resolving duplicates for entity: {entity}")
base_path = os.path.join(self.base_data_path, 'base', entity) unique_id = self.config[entity]['unique_id']
os.makedirs(base_path, exist_ok=True) self.base_data[entity] = self.base_data[entity].sort(by='ingestion_date').unique(subset=unique_id, keep='first')
file_path = os.path.join(base_path, f'{entity}.parquet') logging.debug(f"Successfully resolved duplicates for entity: {entity}")
self.base_data[entity].to_parquet(file_path)
logging.debug(f"Saved base data for entity: {entity} to path: {file_path}") def _save_base_data(self, entity):
os.makedirs(self.base_data_path, exist_ok=True)
file_path = os.path.join(self.base_data_path, f'{entity}.parquet')
self.base_data[entity].write_parquet(file_path)
logging.debug(f"Saved base data for entity: {entity} to path: {file_path}")
+2 -1
View File
@@ -1,2 +1,3 @@
python-dotenv python-dotenv
polars polars
requests
+13
View File
@@ -0,0 +1,13 @@
import polars as pl
entities = ['accounts', 'categories', 'months', 'payees', 'transactions', 'scheduled_transactions']
# Define the path to the transactions parquet file
#file_path = 'data/base/categories.parquet'
file_path = 'data/base/accounts.parquet'
# Read the parquet file into a polars DataFrame
transactions_df = pl.read_parquet(file_path)
# Display the DataFrame
print(transactions_df)