From d155a4c907356d197dc46cf247aa5cafeb6a33de Mon Sep 17 00:00:00 2001 From: Jake Date: Sat, 8 Feb 2025 13:59:21 +0000 Subject: [PATCH] Fixed an issue with saving accounts data --- .gitignore | 3 +- config/config.yaml | 2 +- pipeline/raw_to_base.py | 62 ++++++++++++++++++++++------------------- 3 files changed, 37 insertions(+), 30 deletions(-) diff --git a/.gitignore b/.gitignore index 364b460..a185ba9 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,5 @@ data/* __pycache__/* */__pycache__/* *.pbix -/logs/* \ No newline at end of file +/logs/* +test.py diff --git a/config/config.yaml b/config/config.yaml index 83d3833..ae67d94 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -25,4 +25,4 @@ processed_data_path: data/processed base_data_path: data/base warehouse_data_path: data/warehouse REQUESTS_MAX_RETRIES: 3 -REQUESTS_RETRY_DELAY: 5 \ No newline at end of file +REQUESTS_RETRY_DELAY: 5 diff --git a/pipeline/raw_to_base.py b/pipeline/raw_to_base.py index 88bac37..c1d63d7 100644 --- a/pipeline/raw_to_base.py +++ b/pipeline/raw_to_base.py @@ -39,20 +39,20 @@ class RawToBase: logging.error(f"entity: {entity} has been processed, but we could not move the file out of the raw folder, please clear the raw folder for {entity}.") sys.exit(ec.MOVE_FILE_ERROR) logging.info(f"Successfully processed entity: {entity}") - + def _load_raw_data(self, entity): entity_path = os.path.join(self.raw_data_path, entity) self.data[entity] = [] logging.debug(f"Loading data for entity: {entity} from path: {entity_path}") - + files = [f for f in os.listdir(entity_path) if f.endswith('.json')] - + if len(files) > 1: logging.error(f"""More than one file found in path: {entity_path}. Skipping processing for entity: {entity}. recommended actions is to move the newest file(s) out, re-run main.py. Then move the files back in one at a time oldest to newest and run again for each file""") return False - + if len(files) == 1: file_name = files[0] file_path = os.path.join(entity_path, file_name) @@ -63,11 +63,17 @@ Then move the files back in one at a time oldest to newest and run again for eac except Exception as e: logging.error(f"Failed to load data from file: {file_path}, error: {e}") return False - + if self._is_data_empty(entity, data, file_path): return False - + modified_data = self._add_ingestion_date(entity, data, file_name) + for index, record in enumerate(modified_data): + logging.debug(f"processing record: {record}") + filtered_record = {k: v for k, v in record.items() if not k.startswith('debt_')} + modified_data[index] = filtered_record + logging.debug(f"filtered record: {filtered_record}") + logging.debug(f"modified data: {modified_data}") self.data[entity].append(modified_data) logging.debug(f"Successfully loaded data from file: {file_path}") @@ -88,11 +94,11 @@ Then move the files back in one at a time oldest to newest and run again for eac return True logging.debug(f"Data is not empty for entity: {entity}") return False - + def _add_ingestion_date(self, entity, data, file_name): modified_data = [] ingestion_date = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date() - + logging.debug(f"Adding ingestion date to data for entity: {entity}") if entity == 'categories': for group in data.get('category_groups', []): @@ -122,7 +128,7 @@ Then move the files back in one at a time oldest to newest and run again for eac else: self.base_data[entity] = pl.DataFrame() logging.debug(f"No existing base data found for entity: {entity}, starting with an empty DataFrame") - + #Function to cast null Struct({'': Null}) columns to String def _cast_struct_to_string(self,df): for col in df.columns: @@ -138,7 +144,7 @@ Then move the files back in one at a time oldest to newest and run again for eac def _combine_data(self, entity): logging.debug(f"Combining data for entity: {entity}") combined_data = [] - + # Combine data from the entity if entity == 'categories': for data in self.data[entity]: @@ -147,41 +153,41 @@ Then move the files back in one at a time oldest to newest and run again for eac else: for data in self.data[entity]: combined_data.extend(data) - + new_data_df = pl.DataFrame(combined_data) - + # Ensure the unique id column is preserved unique_id = self.primary_keys[entity]['unique_id'] if unique_id not in new_data_df.columns: logging.error(f"Unique ID column '{unique_id}' not found in the combined data for entity: {entity}") exit(ec.UNIQUE_ID_NOT_FOUND) - + # Cast columns in new_data_df new_data_df = self._cast_struct_to_string(new_data_df) - + # Merge new data with existing base data if entity in self.base_data and not self.base_data[entity].is_empty(): existing_data_df = self.base_data[entity] - + # Cast columns in existing_data_df existing_data_df = self._cast_struct_to_string(existing_data_df) - + # Identify new rows and rows to update new_rows = new_data_df.filter(~pl.col(unique_id).is_in(existing_data_df[unique_id])) updated_rows = new_data_df.filter(pl.col(unique_id).is_in(existing_data_df[unique_id])) - + # Update existing rows for row in updated_rows.iter_rows(named=True): existing_data_df = existing_data_df.with_columns([ pl.when(pl.col(unique_id) == row[unique_id]).then(pl.lit(row[col], allow_object=True)).otherwise(pl.col(col)).alias(col) for col in updated_rows.columns if col != unique_id ]) - + # Add new rows self.base_data[entity] = pl.concat([existing_data_df, new_rows]) else: self.base_data[entity] = new_data_df - + logging.debug(f"Successfully combined data for entity: {entity}") def _save_base_data(self, entity): @@ -194,34 +200,34 @@ Then move the files back in one at a time oldest to newest and run again for eac return False logging.debug(f"Saved base data for entity: {entity} to path: {file_path}") return True - + def _move_raw_to_processed(self, entity): raw_entity_path = os.path.join(self.raw_data_path, entity) processed_path = os.path.join(self.processed_data_path, entity) - + os.makedirs(processed_path, exist_ok=True) - + try: files = [f for f in os.listdir(raw_entity_path) if f.endswith('.json')] if len(files) != 1: logging.error(f"Expected exactly one file in path: {raw_entity_path}, but found {len(files)}") return False - + file_name = files[0] raw_file_path = os.path.join(raw_entity_path, file_name) processed_file_path = os.path.join(processed_path, file_name) - + logging.debug(f"Moving file: {raw_file_path} to {processed_file_path}") - + os.rename(raw_file_path, processed_file_path) logging.debug(f"Moved file: {file_name} to processed") - + except FileNotFoundError as e: logging.error(f"File not found: {e}") return False except Exception as e: logging.error(f"Failed to move file for entity: {entity}, error: {e}") return False - + logging.debug(f"Moved processed file for entity: {entity} to path: {processed_path}") - return True \ No newline at end of file + return True