Merge branch 'main' into feature/add_visuals
This commit is contained in:
@@ -9,4 +9,5 @@ FORBIDDEN = 7
|
|||||||
NOT_FOUND = 8
|
NOT_FOUND = 8
|
||||||
CONFLICT = 9
|
CONFLICT = 9
|
||||||
MOVE_FILE_ERROR = 10
|
MOVE_FILE_ERROR = 10
|
||||||
DUPLICATE_RESOLUTION_ERROR = 11
|
DUPLICATE_RESOLUTION_ERROR = 11
|
||||||
|
UNIQUE_ID_NOT_FOUND = 12
|
||||||
@@ -0,0 +1,17 @@
|
|||||||
|
import polars as pl
|
||||||
|
|
||||||
|
df = pl.read_parquet('data/warehouse/transactions.parquet')
|
||||||
|
print("Data loaded from Parquet file:")
|
||||||
|
print(df)
|
||||||
|
|
||||||
|
relevant_data = df.sql('''
|
||||||
|
SELECT
|
||||||
|
date,
|
||||||
|
sum(transaction_amount) as total
|
||||||
|
FROM self
|
||||||
|
GROUP BY date
|
||||||
|
ORDER BY date DESC
|
||||||
|
'''
|
||||||
|
)
|
||||||
|
print("Data after SQL query:")
|
||||||
|
print(relevant_data)
|
||||||
+43
-16
@@ -20,6 +20,7 @@ class RawToBase:
|
|||||||
|
|
||||||
def process_entities(self):
|
def process_entities(self):
|
||||||
for entity in self.entities:
|
for entity in self.entities:
|
||||||
|
logging.info(f"Processing entity: {entity}")
|
||||||
# check the file is in the raw data path, if not skip the entity
|
# check the file is in the raw data path, if not skip the entity
|
||||||
folder_path = os.path.join(self.raw_data_path, entity)
|
folder_path = os.path.join(self.raw_data_path, entity)
|
||||||
folder_contents = os.listdir(folder_path)
|
folder_contents = os.listdir(folder_path)
|
||||||
@@ -31,15 +32,13 @@ class RawToBase:
|
|||||||
continue
|
continue
|
||||||
self._load_existing_base_data(entity)
|
self._load_existing_base_data(entity)
|
||||||
self._combine_data(entity)
|
self._combine_data(entity)
|
||||||
if not self._resolve_duplicates(entity):
|
|
||||||
logging.error(f"entity: {entity} failed duplicate resolution.")
|
|
||||||
sys.exit(ec.DUPLICATE_RESOLUTION_ERROR)
|
|
||||||
if not self._save_base_data(entity):
|
if not self._save_base_data(entity):
|
||||||
logging.error(f"Skipping processing for entity: {entity} due to failed saving base data.")
|
logging.error(f"Skipping processing for entity: {entity} due to failed saving base data.")
|
||||||
continue
|
continue
|
||||||
if not self._move_raw_to_processed(entity):
|
if not self._move_raw_to_processed(entity):
|
||||||
logging.error(f"entity: {entity} has been processed, but we could not move the file out of the raw folder, please clear the raw folder for {entity}.")
|
logging.error(f"entity: {entity} has been processed, but we could not move the file out of the raw folder, please clear the raw folder for {entity}.")
|
||||||
sys.exit(ec.MOVE_FILE_ERROR)
|
sys.exit(ec.MOVE_FILE_ERROR)
|
||||||
|
logging.info(f"Successfully processed entity: {entity}")
|
||||||
|
|
||||||
def _load_raw_data(self, entity):
|
def _load_raw_data(self, entity):
|
||||||
entity_path = os.path.join(self.raw_data_path, entity)
|
entity_path = os.path.join(self.raw_data_path, entity)
|
||||||
@@ -124,9 +123,23 @@ Then move the files back in one at a time oldest to newest and run again for eac
|
|||||||
self.base_data[entity] = pl.DataFrame()
|
self.base_data[entity] = pl.DataFrame()
|
||||||
logging.debug(f"No existing base data found for entity: {entity}, starting with an empty DataFrame")
|
logging.debug(f"No existing base data found for entity: {entity}, starting with an empty DataFrame")
|
||||||
|
|
||||||
|
#Function to cast null Struct({'': Null}) columns to String
|
||||||
|
def _cast_struct_to_string(self,df):
|
||||||
|
for col in df.columns:
|
||||||
|
if df.schema[col] == pl.Struct({'': pl.Null}):
|
||||||
|
df = df.with_columns(
|
||||||
|
pl.when(pl.col(col).is_null())
|
||||||
|
.then(pl.lit("null"))
|
||||||
|
.otherwise(pl.col(col).map_elements(lambda x: str(x) if x is not None else "null"))
|
||||||
|
.alias(col)
|
||||||
|
)
|
||||||
|
return df
|
||||||
|
|
||||||
def _combine_data(self, entity):
|
def _combine_data(self, entity):
|
||||||
logging.debug(f"Combining data for entity: {entity}")
|
logging.debug(f"Combining data for entity: {entity}")
|
||||||
combined_data = []
|
combined_data = []
|
||||||
|
|
||||||
|
# Combine data from the entity
|
||||||
if entity == 'categories':
|
if entity == 'categories':
|
||||||
for data in self.data[entity]:
|
for data in self.data[entity]:
|
||||||
for category in data:
|
for category in data:
|
||||||
@@ -141,22 +154,36 @@ Then move the files back in one at a time oldest to newest and run again for eac
|
|||||||
unique_id = self.primary_keys[entity]['unique_id']
|
unique_id = self.primary_keys[entity]['unique_id']
|
||||||
if unique_id not in new_data_df.columns:
|
if unique_id not in new_data_df.columns:
|
||||||
logging.error(f"Unique ID column '{unique_id}' not found in the combined data for entity: {entity}")
|
logging.error(f"Unique ID column '{unique_id}' not found in the combined data for entity: {entity}")
|
||||||
exit(1)
|
exit(ec.UNIQUE_ID_NOT_FOUND)
|
||||||
|
|
||||||
|
# Cast columns in new_data_df
|
||||||
|
new_data_df = self._cast_struct_to_string(new_data_df)
|
||||||
|
|
||||||
|
# Merge new data with existing base data
|
||||||
|
if entity in self.base_data and not self.base_data[entity].is_empty():
|
||||||
|
existing_data_df = self.base_data[entity]
|
||||||
|
|
||||||
|
# Cast columns in existing_data_df
|
||||||
|
existing_data_df = self._cast_struct_to_string(existing_data_df)
|
||||||
|
|
||||||
|
# Identify new rows and rows to update
|
||||||
|
new_rows = new_data_df.filter(~pl.col(unique_id).is_in(existing_data_df[unique_id]))
|
||||||
|
updated_rows = new_data_df.filter(pl.col(unique_id).is_in(existing_data_df[unique_id]))
|
||||||
|
|
||||||
|
# Update existing rows
|
||||||
|
for row in updated_rows.iter_rows(named=True):
|
||||||
|
existing_data_df = existing_data_df.with_columns([
|
||||||
|
pl.when(pl.col(unique_id) == row[unique_id]).then(pl.lit(row[col], allow_object=True)).otherwise(pl.col(col)).alias(col)
|
||||||
|
for col in updated_rows.columns if col != unique_id
|
||||||
|
])
|
||||||
|
|
||||||
|
# Add new rows
|
||||||
|
self.base_data[entity] = pl.concat([existing_data_df, new_rows])
|
||||||
|
else:
|
||||||
|
self.base_data[entity] = new_data_df
|
||||||
|
|
||||||
self.base_data[entity] = new_data_df
|
|
||||||
logging.debug(f"Successfully combined data for entity: {entity}")
|
logging.debug(f"Successfully combined data for entity: {entity}")
|
||||||
|
|
||||||
def _resolve_duplicates(self, entity):
|
|
||||||
logging.debug(f"Resolving duplicates for entity: {entity}")
|
|
||||||
unique_id = self.primary_keys[entity]['unique_id']
|
|
||||||
try:
|
|
||||||
self.base_data[entity] = self.base_data[entity].sort(by='ingestion_date').unique(subset=unique_id, keep='first')
|
|
||||||
except Exception as e:
|
|
||||||
logging.error(f"Failed to resolve duplicates for entity: {entity}, error: {e}")
|
|
||||||
return False
|
|
||||||
logging.debug(f"Successfully resolved duplicates for entity: {entity}")
|
|
||||||
return True
|
|
||||||
|
|
||||||
def _save_base_data(self, entity):
|
def _save_base_data(self, entity):
|
||||||
os.makedirs(self.base_data_path, exist_ok=True)
|
os.makedirs(self.base_data_path, exist_ok=True)
|
||||||
file_path = os.path.join(self.base_data_path, f'{entity}.parquet')
|
file_path = os.path.join(self.base_data_path, f'{entity}.parquet')
|
||||||
|
|||||||
Reference in New Issue
Block a user