Fixed an issue with saving accounts data
This commit is contained in:
+2
-1
@@ -7,4 +7,5 @@ data/*
|
|||||||
__pycache__/*
|
__pycache__/*
|
||||||
*/__pycache__/*
|
*/__pycache__/*
|
||||||
*.pbix
|
*.pbix
|
||||||
/logs/*
|
/logs/*
|
||||||
|
test.py
|
||||||
|
|||||||
+1
-1
@@ -25,4 +25,4 @@ processed_data_path: data/processed
|
|||||||
base_data_path: data/base
|
base_data_path: data/base
|
||||||
warehouse_data_path: data/warehouse
|
warehouse_data_path: data/warehouse
|
||||||
REQUESTS_MAX_RETRIES: 3
|
REQUESTS_MAX_RETRIES: 3
|
||||||
REQUESTS_RETRY_DELAY: 5
|
REQUESTS_RETRY_DELAY: 5
|
||||||
|
|||||||
+34
-28
@@ -39,20 +39,20 @@ class RawToBase:
|
|||||||
logging.error(f"entity: {entity} has been processed, but we could not move the file out of the raw folder, please clear the raw folder for {entity}.")
|
logging.error(f"entity: {entity} has been processed, but we could not move the file out of the raw folder, please clear the raw folder for {entity}.")
|
||||||
sys.exit(ec.MOVE_FILE_ERROR)
|
sys.exit(ec.MOVE_FILE_ERROR)
|
||||||
logging.info(f"Successfully processed entity: {entity}")
|
logging.info(f"Successfully processed entity: {entity}")
|
||||||
|
|
||||||
def _load_raw_data(self, entity):
|
def _load_raw_data(self, entity):
|
||||||
entity_path = os.path.join(self.raw_data_path, entity)
|
entity_path = os.path.join(self.raw_data_path, entity)
|
||||||
self.data[entity] = []
|
self.data[entity] = []
|
||||||
logging.debug(f"Loading data for entity: {entity} from path: {entity_path}")
|
logging.debug(f"Loading data for entity: {entity} from path: {entity_path}")
|
||||||
|
|
||||||
files = [f for f in os.listdir(entity_path) if f.endswith('.json')]
|
files = [f for f in os.listdir(entity_path) if f.endswith('.json')]
|
||||||
|
|
||||||
if len(files) > 1:
|
if len(files) > 1:
|
||||||
logging.error(f"""More than one file found in path: {entity_path}. Skipping processing for entity: {entity}.
|
logging.error(f"""More than one file found in path: {entity_path}. Skipping processing for entity: {entity}.
|
||||||
recommended actions is to move the newest file(s) out, re-run main.py.
|
recommended actions is to move the newest file(s) out, re-run main.py.
|
||||||
Then move the files back in one at a time oldest to newest and run again for each file""")
|
Then move the files back in one at a time oldest to newest and run again for each file""")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if len(files) == 1:
|
if len(files) == 1:
|
||||||
file_name = files[0]
|
file_name = files[0]
|
||||||
file_path = os.path.join(entity_path, file_name)
|
file_path = os.path.join(entity_path, file_name)
|
||||||
@@ -63,11 +63,17 @@ Then move the files back in one at a time oldest to newest and run again for eac
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Failed to load data from file: {file_path}, error: {e}")
|
logging.error(f"Failed to load data from file: {file_path}, error: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if self._is_data_empty(entity, data, file_path):
|
if self._is_data_empty(entity, data, file_path):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
modified_data = self._add_ingestion_date(entity, data, file_name)
|
modified_data = self._add_ingestion_date(entity, data, file_name)
|
||||||
|
for index, record in enumerate(modified_data):
|
||||||
|
logging.debug(f"processing record: {record}")
|
||||||
|
filtered_record = {k: v for k, v in record.items() if not k.startswith('debt_')}
|
||||||
|
modified_data[index] = filtered_record
|
||||||
|
logging.debug(f"filtered record: {filtered_record}")
|
||||||
|
logging.debug(f"modified data: {modified_data}")
|
||||||
|
|
||||||
self.data[entity].append(modified_data)
|
self.data[entity].append(modified_data)
|
||||||
logging.debug(f"Successfully loaded data from file: {file_path}")
|
logging.debug(f"Successfully loaded data from file: {file_path}")
|
||||||
@@ -88,11 +94,11 @@ Then move the files back in one at a time oldest to newest and run again for eac
|
|||||||
return True
|
return True
|
||||||
logging.debug(f"Data is not empty for entity: {entity}")
|
logging.debug(f"Data is not empty for entity: {entity}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _add_ingestion_date(self, entity, data, file_name):
|
def _add_ingestion_date(self, entity, data, file_name):
|
||||||
modified_data = []
|
modified_data = []
|
||||||
ingestion_date = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()
|
ingestion_date = datetime.strptime(file_name.split('.')[0], '%Y%m%d%H%M%S').date()
|
||||||
|
|
||||||
logging.debug(f"Adding ingestion date to data for entity: {entity}")
|
logging.debug(f"Adding ingestion date to data for entity: {entity}")
|
||||||
if entity == 'categories':
|
if entity == 'categories':
|
||||||
for group in data.get('category_groups', []):
|
for group in data.get('category_groups', []):
|
||||||
@@ -122,7 +128,7 @@ Then move the files back in one at a time oldest to newest and run again for eac
|
|||||||
else:
|
else:
|
||||||
self.base_data[entity] = pl.DataFrame()
|
self.base_data[entity] = pl.DataFrame()
|
||||||
logging.debug(f"No existing base data found for entity: {entity}, starting with an empty DataFrame")
|
logging.debug(f"No existing base data found for entity: {entity}, starting with an empty DataFrame")
|
||||||
|
|
||||||
#Function to cast null Struct({'': Null}) columns to String
|
#Function to cast null Struct({'': Null}) columns to String
|
||||||
def _cast_struct_to_string(self,df):
|
def _cast_struct_to_string(self,df):
|
||||||
for col in df.columns:
|
for col in df.columns:
|
||||||
@@ -138,7 +144,7 @@ Then move the files back in one at a time oldest to newest and run again for eac
|
|||||||
def _combine_data(self, entity):
|
def _combine_data(self, entity):
|
||||||
logging.debug(f"Combining data for entity: {entity}")
|
logging.debug(f"Combining data for entity: {entity}")
|
||||||
combined_data = []
|
combined_data = []
|
||||||
|
|
||||||
# Combine data from the entity
|
# Combine data from the entity
|
||||||
if entity == 'categories':
|
if entity == 'categories':
|
||||||
for data in self.data[entity]:
|
for data in self.data[entity]:
|
||||||
@@ -147,41 +153,41 @@ Then move the files back in one at a time oldest to newest and run again for eac
|
|||||||
else:
|
else:
|
||||||
for data in self.data[entity]:
|
for data in self.data[entity]:
|
||||||
combined_data.extend(data)
|
combined_data.extend(data)
|
||||||
|
|
||||||
new_data_df = pl.DataFrame(combined_data)
|
new_data_df = pl.DataFrame(combined_data)
|
||||||
|
|
||||||
# Ensure the unique id column is preserved
|
# Ensure the unique id column is preserved
|
||||||
unique_id = self.primary_keys[entity]['unique_id']
|
unique_id = self.primary_keys[entity]['unique_id']
|
||||||
if unique_id not in new_data_df.columns:
|
if unique_id not in new_data_df.columns:
|
||||||
logging.error(f"Unique ID column '{unique_id}' not found in the combined data for entity: {entity}")
|
logging.error(f"Unique ID column '{unique_id}' not found in the combined data for entity: {entity}")
|
||||||
exit(ec.UNIQUE_ID_NOT_FOUND)
|
exit(ec.UNIQUE_ID_NOT_FOUND)
|
||||||
|
|
||||||
# Cast columns in new_data_df
|
# Cast columns in new_data_df
|
||||||
new_data_df = self._cast_struct_to_string(new_data_df)
|
new_data_df = self._cast_struct_to_string(new_data_df)
|
||||||
|
|
||||||
# Merge new data with existing base data
|
# Merge new data with existing base data
|
||||||
if entity in self.base_data and not self.base_data[entity].is_empty():
|
if entity in self.base_data and not self.base_data[entity].is_empty():
|
||||||
existing_data_df = self.base_data[entity]
|
existing_data_df = self.base_data[entity]
|
||||||
|
|
||||||
# Cast columns in existing_data_df
|
# Cast columns in existing_data_df
|
||||||
existing_data_df = self._cast_struct_to_string(existing_data_df)
|
existing_data_df = self._cast_struct_to_string(existing_data_df)
|
||||||
|
|
||||||
# Identify new rows and rows to update
|
# Identify new rows and rows to update
|
||||||
new_rows = new_data_df.filter(~pl.col(unique_id).is_in(existing_data_df[unique_id]))
|
new_rows = new_data_df.filter(~pl.col(unique_id).is_in(existing_data_df[unique_id]))
|
||||||
updated_rows = new_data_df.filter(pl.col(unique_id).is_in(existing_data_df[unique_id]))
|
updated_rows = new_data_df.filter(pl.col(unique_id).is_in(existing_data_df[unique_id]))
|
||||||
|
|
||||||
# Update existing rows
|
# Update existing rows
|
||||||
for row in updated_rows.iter_rows(named=True):
|
for row in updated_rows.iter_rows(named=True):
|
||||||
existing_data_df = existing_data_df.with_columns([
|
existing_data_df = existing_data_df.with_columns([
|
||||||
pl.when(pl.col(unique_id) == row[unique_id]).then(pl.lit(row[col], allow_object=True)).otherwise(pl.col(col)).alias(col)
|
pl.when(pl.col(unique_id) == row[unique_id]).then(pl.lit(row[col], allow_object=True)).otherwise(pl.col(col)).alias(col)
|
||||||
for col in updated_rows.columns if col != unique_id
|
for col in updated_rows.columns if col != unique_id
|
||||||
])
|
])
|
||||||
|
|
||||||
# Add new rows
|
# Add new rows
|
||||||
self.base_data[entity] = pl.concat([existing_data_df, new_rows])
|
self.base_data[entity] = pl.concat([existing_data_df, new_rows])
|
||||||
else:
|
else:
|
||||||
self.base_data[entity] = new_data_df
|
self.base_data[entity] = new_data_df
|
||||||
|
|
||||||
logging.debug(f"Successfully combined data for entity: {entity}")
|
logging.debug(f"Successfully combined data for entity: {entity}")
|
||||||
|
|
||||||
def _save_base_data(self, entity):
|
def _save_base_data(self, entity):
|
||||||
@@ -194,34 +200,34 @@ Then move the files back in one at a time oldest to newest and run again for eac
|
|||||||
return False
|
return False
|
||||||
logging.debug(f"Saved base data for entity: {entity} to path: {file_path}")
|
logging.debug(f"Saved base data for entity: {entity} to path: {file_path}")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _move_raw_to_processed(self, entity):
|
def _move_raw_to_processed(self, entity):
|
||||||
raw_entity_path = os.path.join(self.raw_data_path, entity)
|
raw_entity_path = os.path.join(self.raw_data_path, entity)
|
||||||
processed_path = os.path.join(self.processed_data_path, entity)
|
processed_path = os.path.join(self.processed_data_path, entity)
|
||||||
|
|
||||||
os.makedirs(processed_path, exist_ok=True)
|
os.makedirs(processed_path, exist_ok=True)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
files = [f for f in os.listdir(raw_entity_path) if f.endswith('.json')]
|
files = [f for f in os.listdir(raw_entity_path) if f.endswith('.json')]
|
||||||
if len(files) != 1:
|
if len(files) != 1:
|
||||||
logging.error(f"Expected exactly one file in path: {raw_entity_path}, but found {len(files)}")
|
logging.error(f"Expected exactly one file in path: {raw_entity_path}, but found {len(files)}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
file_name = files[0]
|
file_name = files[0]
|
||||||
raw_file_path = os.path.join(raw_entity_path, file_name)
|
raw_file_path = os.path.join(raw_entity_path, file_name)
|
||||||
processed_file_path = os.path.join(processed_path, file_name)
|
processed_file_path = os.path.join(processed_path, file_name)
|
||||||
|
|
||||||
logging.debug(f"Moving file: {raw_file_path} to {processed_file_path}")
|
logging.debug(f"Moving file: {raw_file_path} to {processed_file_path}")
|
||||||
|
|
||||||
os.rename(raw_file_path, processed_file_path)
|
os.rename(raw_file_path, processed_file_path)
|
||||||
logging.debug(f"Moved file: {file_name} to processed")
|
logging.debug(f"Moved file: {file_name} to processed")
|
||||||
|
|
||||||
except FileNotFoundError as e:
|
except FileNotFoundError as e:
|
||||||
logging.error(f"File not found: {e}")
|
logging.error(f"File not found: {e}")
|
||||||
return False
|
return False
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Failed to move file for entity: {entity}, error: {e}")
|
logging.error(f"Failed to move file for entity: {entity}, error: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
logging.debug(f"Moved processed file for entity: {entity} to path: {processed_path}")
|
logging.debug(f"Moved processed file for entity: {entity} to path: {processed_path}")
|
||||||
return True
|
return True
|
||||||
|
|||||||
Reference in New Issue
Block a user