better handling of merging the data in the combine, lead to not needing to handle duplicates
This commit is contained in:
@@ -10,3 +10,4 @@ NOT_FOUND = 8
|
||||
CONFLICT = 9
|
||||
MOVE_FILE_ERROR = 10
|
||||
DUPLICATE_RESOLUTION_ERROR = 11
|
||||
UNIQUE_ID_NOT_FOUND = 12
|
||||
+41
-16
@@ -32,9 +32,6 @@ class RawToBase:
|
||||
continue
|
||||
self._load_existing_base_data(entity)
|
||||
self._combine_data(entity)
|
||||
if not self._resolve_duplicates(entity):
|
||||
logging.error(f"entity: {entity} failed duplicate resolution.")
|
||||
sys.exit(ec.DUPLICATE_RESOLUTION_ERROR)
|
||||
if not self._save_base_data(entity):
|
||||
logging.error(f"Skipping processing for entity: {entity} due to failed saving base data.")
|
||||
continue
|
||||
@@ -126,9 +123,23 @@ Then move the files back in one at a time oldest to newest and run again for eac
|
||||
self.base_data[entity] = pl.DataFrame()
|
||||
logging.debug(f"No existing base data found for entity: {entity}, starting with an empty DataFrame")
|
||||
|
||||
#Function to cast null Struct({'': Null}) columns to String
|
||||
def _cast_struct_to_string(self,df):
|
||||
for col in df.columns:
|
||||
if df.schema[col] == pl.Struct({'': pl.Null}):
|
||||
df = df.with_columns(
|
||||
pl.when(pl.col(col).is_null())
|
||||
.then(pl.lit("null"))
|
||||
.otherwise(pl.col(col).map_elements(lambda x: str(x) if x is not None else "null"))
|
||||
.alias(col)
|
||||
)
|
||||
return df
|
||||
|
||||
def _combine_data(self, entity):
|
||||
logging.debug(f"Combining data for entity: {entity}")
|
||||
combined_data = []
|
||||
|
||||
# Combine data from the entity
|
||||
if entity == 'categories':
|
||||
for data in self.data[entity]:
|
||||
for category in data:
|
||||
@@ -143,22 +154,36 @@ Then move the files back in one at a time oldest to newest and run again for eac
|
||||
unique_id = self.primary_keys[entity]['unique_id']
|
||||
if unique_id not in new_data_df.columns:
|
||||
logging.error(f"Unique ID column '{unique_id}' not found in the combined data for entity: {entity}")
|
||||
exit(1)
|
||||
exit(ec.UNIQUE_ID_NOT_FOUND)
|
||||
|
||||
# Cast columns in new_data_df
|
||||
new_data_df = self._cast_struct_to_string(new_data_df)
|
||||
|
||||
# Merge new data with existing base data
|
||||
if entity in self.base_data and not self.base_data[entity].is_empty():
|
||||
existing_data_df = self.base_data[entity]
|
||||
|
||||
# Cast columns in existing_data_df
|
||||
existing_data_df = self._cast_struct_to_string(existing_data_df)
|
||||
|
||||
# Identify new rows and rows to update
|
||||
new_rows = new_data_df.filter(~pl.col(unique_id).is_in(existing_data_df[unique_id]))
|
||||
updated_rows = new_data_df.filter(pl.col(unique_id).is_in(existing_data_df[unique_id]))
|
||||
|
||||
# Update existing rows
|
||||
for row in updated_rows.iter_rows(named=True):
|
||||
existing_data_df = existing_data_df.with_columns([
|
||||
pl.when(pl.col(unique_id) == row[unique_id]).then(pl.lit(row[col], allow_object=True)).otherwise(pl.col(col)).alias(col)
|
||||
for col in updated_rows.columns if col != unique_id
|
||||
])
|
||||
|
||||
# Add new rows
|
||||
self.base_data[entity] = pl.concat([existing_data_df, new_rows])
|
||||
else:
|
||||
self.base_data[entity] = new_data_df
|
||||
|
||||
self.base_data[entity] = new_data_df
|
||||
logging.debug(f"Successfully combined data for entity: {entity}")
|
||||
|
||||
def _resolve_duplicates(self, entity):
|
||||
logging.debug(f"Resolving duplicates for entity: {entity}")
|
||||
unique_id = self.primary_keys[entity]['unique_id']
|
||||
try:
|
||||
self.base_data[entity] = self.base_data[entity].sort(by='ingestion_date').unique(subset=unique_id, keep='first')
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to resolve duplicates for entity: {entity}, error: {e}")
|
||||
return False
|
||||
logging.debug(f"Successfully resolved duplicates for entity: {entity}")
|
||||
return True
|
||||
|
||||
def _save_base_data(self, entity):
|
||||
os.makedirs(self.base_data_path, exist_ok=True)
|
||||
file_path = os.path.join(self.base_data_path, f'{entity}.parquet')
|
||||
|
||||
Reference in New Issue
Block a user