From a966ad5ebdfcbc7628e84b48d320d8c9fd3c5328 Mon Sep 17 00:00:00 2001 From: Jake Pullen Date: Mon, 12 Aug 2024 16:51:06 +0100 Subject: [PATCH] better handling of merging the data in the combine, lead to not needing to handle duplicates --- config/exit_codes.py | 3 ++- pipeline/raw_to_base.py | 57 +++++++++++++++++++++++++++++------------ 2 files changed, 43 insertions(+), 17 deletions(-) diff --git a/config/exit_codes.py b/config/exit_codes.py index 07d3e17..5c508c0 100644 --- a/config/exit_codes.py +++ b/config/exit_codes.py @@ -9,4 +9,5 @@ FORBIDDEN = 7 NOT_FOUND = 8 CONFLICT = 9 MOVE_FILE_ERROR = 10 -DUPLICATE_RESOLUTION_ERROR = 11 \ No newline at end of file +DUPLICATE_RESOLUTION_ERROR = 11 +UNIQUE_ID_NOT_FOUND = 12 \ No newline at end of file diff --git a/pipeline/raw_to_base.py b/pipeline/raw_to_base.py index 255a8ad..932bb78 100644 --- a/pipeline/raw_to_base.py +++ b/pipeline/raw_to_base.py @@ -32,9 +32,6 @@ class RawToBase: continue self._load_existing_base_data(entity) self._combine_data(entity) - if not self._resolve_duplicates(entity): - logging.error(f"entity: {entity} failed duplicate resolution.") - sys.exit(ec.DUPLICATE_RESOLUTION_ERROR) if not self._save_base_data(entity): logging.error(f"Skipping processing for entity: {entity} due to failed saving base data.") continue @@ -126,9 +123,23 @@ Then move the files back in one at a time oldest to newest and run again for eac self.base_data[entity] = pl.DataFrame() logging.debug(f"No existing base data found for entity: {entity}, starting with an empty DataFrame") + #Function to cast null Struct({'': Null}) columns to String + def _cast_struct_to_string(self,df): + for col in df.columns: + if df.schema[col] == pl.Struct({'': pl.Null}): + df = df.with_columns( + pl.when(pl.col(col).is_null()) + .then(pl.lit("null")) + .otherwise(pl.col(col).map_elements(lambda x: str(x) if x is not None else "null")) + .alias(col) + ) + return df + def _combine_data(self, entity): logging.debug(f"Combining data for entity: {entity}") combined_data = [] + + # Combine data from the entity if entity == 'categories': for data in self.data[entity]: for category in data: @@ -143,22 +154,36 @@ Then move the files back in one at a time oldest to newest and run again for eac unique_id = self.primary_keys[entity]['unique_id'] if unique_id not in new_data_df.columns: logging.error(f"Unique ID column '{unique_id}' not found in the combined data for entity: {entity}") - exit(1) + exit(ec.UNIQUE_ID_NOT_FOUND) + + # Cast columns in new_data_df + new_data_df = self._cast_struct_to_string(new_data_df) + + # Merge new data with existing base data + if entity in self.base_data and not self.base_data[entity].is_empty(): + existing_data_df = self.base_data[entity] + + # Cast columns in existing_data_df + existing_data_df = self._cast_struct_to_string(existing_data_df) + + # Identify new rows and rows to update + new_rows = new_data_df.filter(~pl.col(unique_id).is_in(existing_data_df[unique_id])) + updated_rows = new_data_df.filter(pl.col(unique_id).is_in(existing_data_df[unique_id])) + + # Update existing rows + for row in updated_rows.iter_rows(named=True): + existing_data_df = existing_data_df.with_columns([ + pl.when(pl.col(unique_id) == row[unique_id]).then(pl.lit(row[col], allow_object=True)).otherwise(pl.col(col)).alias(col) + for col in updated_rows.columns if col != unique_id + ]) + + # Add new rows + self.base_data[entity] = pl.concat([existing_data_df, new_rows]) + else: + self.base_data[entity] = new_data_df - self.base_data[entity] = new_data_df logging.debug(f"Successfully combined data for entity: {entity}") - def _resolve_duplicates(self, entity): - logging.debug(f"Resolving duplicates for entity: {entity}") - unique_id = self.primary_keys[entity]['unique_id'] - try: - self.base_data[entity] = self.base_data[entity].sort(by='ingestion_date').unique(subset=unique_id, keep='first') - except Exception as e: - logging.error(f"Failed to resolve duplicates for entity: {entity}, error: {e}") - return False - logging.debug(f"Successfully resolved duplicates for entity: {entity}") - return True - def _save_base_data(self, entity): os.makedirs(self.base_data_path, exist_ok=True) file_path = os.path.join(self.base_data_path, f'{entity}.parquet')