diff --git a/config/exit_codes.py b/config/exit_codes.py index 07d3e17..5c508c0 100644 --- a/config/exit_codes.py +++ b/config/exit_codes.py @@ -9,4 +9,5 @@ FORBIDDEN = 7 NOT_FOUND = 8 CONFLICT = 9 MOVE_FILE_ERROR = 10 -DUPLICATE_RESOLUTION_ERROR = 11 \ No newline at end of file +DUPLICATE_RESOLUTION_ERROR = 11 +UNIQUE_ID_NOT_FOUND = 12 \ No newline at end of file diff --git a/data_check.py b/data_check.py new file mode 100644 index 0000000..09b2c77 --- /dev/null +++ b/data_check.py @@ -0,0 +1,17 @@ +import polars as pl + +df = pl.read_parquet('data/warehouse/transactions.parquet') +print("Data loaded from Parquet file:") +print(df) + +relevant_data = df.sql(''' + SELECT + date, + sum(transaction_amount) as total + FROM self + GROUP BY date + ORDER BY date DESC + ''' +) +print("Data after SQL query:") +print(relevant_data) \ No newline at end of file diff --git a/pipeline/raw_to_base.py b/pipeline/raw_to_base.py index 1e2072a..932bb78 100644 --- a/pipeline/raw_to_base.py +++ b/pipeline/raw_to_base.py @@ -20,6 +20,7 @@ class RawToBase: def process_entities(self): for entity in self.entities: + logging.info(f"Processing entity: {entity}") # check the file is in the raw data path, if not skip the entity folder_path = os.path.join(self.raw_data_path, entity) folder_contents = os.listdir(folder_path) @@ -31,15 +32,13 @@ class RawToBase: continue self._load_existing_base_data(entity) self._combine_data(entity) - if not self._resolve_duplicates(entity): - logging.error(f"entity: {entity} failed duplicate resolution.") - sys.exit(ec.DUPLICATE_RESOLUTION_ERROR) if not self._save_base_data(entity): logging.error(f"Skipping processing for entity: {entity} due to failed saving base data.") continue if not self._move_raw_to_processed(entity): logging.error(f"entity: {entity} has been processed, but we could not move the file out of the raw folder, please clear the raw folder for {entity}.") sys.exit(ec.MOVE_FILE_ERROR) + logging.info(f"Successfully processed entity: {entity}") def _load_raw_data(self, entity): entity_path = os.path.join(self.raw_data_path, entity) @@ -124,9 +123,23 @@ Then move the files back in one at a time oldest to newest and run again for eac self.base_data[entity] = pl.DataFrame() logging.debug(f"No existing base data found for entity: {entity}, starting with an empty DataFrame") + #Function to cast null Struct({'': Null}) columns to String + def _cast_struct_to_string(self,df): + for col in df.columns: + if df.schema[col] == pl.Struct({'': pl.Null}): + df = df.with_columns( + pl.when(pl.col(col).is_null()) + .then(pl.lit("null")) + .otherwise(pl.col(col).map_elements(lambda x: str(x) if x is not None else "null")) + .alias(col) + ) + return df + def _combine_data(self, entity): logging.debug(f"Combining data for entity: {entity}") combined_data = [] + + # Combine data from the entity if entity == 'categories': for data in self.data[entity]: for category in data: @@ -141,22 +154,36 @@ Then move the files back in one at a time oldest to newest and run again for eac unique_id = self.primary_keys[entity]['unique_id'] if unique_id not in new_data_df.columns: logging.error(f"Unique ID column '{unique_id}' not found in the combined data for entity: {entity}") - exit(1) + exit(ec.UNIQUE_ID_NOT_FOUND) + + # Cast columns in new_data_df + new_data_df = self._cast_struct_to_string(new_data_df) + + # Merge new data with existing base data + if entity in self.base_data and not self.base_data[entity].is_empty(): + existing_data_df = self.base_data[entity] + + # Cast columns in existing_data_df + existing_data_df = self._cast_struct_to_string(existing_data_df) + + # Identify new rows and rows to update + new_rows = new_data_df.filter(~pl.col(unique_id).is_in(existing_data_df[unique_id])) + updated_rows = new_data_df.filter(pl.col(unique_id).is_in(existing_data_df[unique_id])) + + # Update existing rows + for row in updated_rows.iter_rows(named=True): + existing_data_df = existing_data_df.with_columns([ + pl.when(pl.col(unique_id) == row[unique_id]).then(pl.lit(row[col], allow_object=True)).otherwise(pl.col(col)).alias(col) + for col in updated_rows.columns if col != unique_id + ]) + + # Add new rows + self.base_data[entity] = pl.concat([existing_data_df, new_rows]) + else: + self.base_data[entity] = new_data_df - self.base_data[entity] = new_data_df logging.debug(f"Successfully combined data for entity: {entity}") - def _resolve_duplicates(self, entity): - logging.debug(f"Resolving duplicates for entity: {entity}") - unique_id = self.primary_keys[entity]['unique_id'] - try: - self.base_data[entity] = self.base_data[entity].sort(by='ingestion_date').unique(subset=unique_id, keep='first') - except Exception as e: - logging.error(f"Failed to resolve duplicates for entity: {entity}, error: {e}") - return False - logging.debug(f"Successfully resolved duplicates for entity: {entity}") - return True - def _save_base_data(self, entity): os.makedirs(self.base_data_path, exist_ok=True) file_path = os.path.join(self.base_data_path, f'{entity}.parquet')