Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
5da185a826
|
|||
|
1d21ab5f36
|
|||
|
0e682aca35
|
|||
| 354f4c7fc6 |
@@ -15,8 +15,9 @@ The project consists of a main pipeline workflow that processes multiple modules
|
|||||||
|
|
||||||
### main.py
|
### main.py
|
||||||
|
|
||||||
- **Startup Safety Check**: Scans the `COMBINED_FOLDER` at startup and warns the user if existing files are found, offering a chance to abort to prevent accidental data mixing.
|
- **Startup Safety Check**: Scans the `COMBINED_FOLDER` at startup and warns the user if existing files are found, Deleting existing files if continue is accepted.
|
||||||
- **Batch Processing**: Processes input tar files in configurable batches to manage resource usage.
|
- **Batch Processing**: Processes input tar files in configurable batches to manage resource usage.
|
||||||
|
- **Tidy by Default**: Default settings wil delete all mid step files and keep only the original Tar files. Can be changed in config.py
|
||||||
- **End-to-End Processing**: Extracts GZ files, processes DAT/ASC, and appends to CSV in a single thread per file.
|
- **End-to-End Processing**: Extracts GZ files, processes DAT/ASC, and appends to CSV in a single thread per file.
|
||||||
- **Concurrency**: Uses multi-threading to process individual GZ files within a batch concurrently.
|
- **Concurrency**: Uses multi-threading to process individual GZ files within a batch concurrently.
|
||||||
- **Cumulative Data**: Automatically appends new query results to the existing CSV files in `COMBINED_FOLDER` for each batch, ensuring no data is lost and columns are correctly aligned.
|
- **Cumulative Data**: Automatically appends new query results to the existing CSV files in `COMBINED_FOLDER` for each batch, ensuring no data is lost and columns are correctly aligned.
|
||||||
@@ -31,7 +32,7 @@ The project consists of a main pipeline workflow that processes multiple modules
|
|||||||
|
|
||||||
- Process multiple NIMROD dat files
|
- Process multiple NIMROD dat files
|
||||||
- Automatically extract datetime from file data
|
- Automatically extract datetime from file data
|
||||||
- Export clipped raster data to ASC format
|
- Export raster data to ASC format
|
||||||
|
|
||||||
### generate_timeseries.py
|
### generate_timeseries.py
|
||||||
|
|
||||||
|
|||||||
@@ -92,13 +92,18 @@ if __name__ == "__main__":
|
|||||||
f"Found {len(existing_combined)} files in {Config.COMBINED_FOLDER}"
|
f"Found {len(existing_combined)} files in {Config.COMBINED_FOLDER}"
|
||||||
)
|
)
|
||||||
logging.warning(
|
logging.warning(
|
||||||
"You may want to remove these before continuing to avoid duplicates or messy data."
|
"If you continue these WILL BE DELETED, Please make sure you have them saved."
|
||||||
)
|
)
|
||||||
logging.warning("!" * 80)
|
logging.warning("!" * 80)
|
||||||
response = input("Continue? (Y/N): ").strip().lower()
|
response = input("Continue? (Y/N): ").strip().lower()
|
||||||
if response != "y":
|
if response != "y":
|
||||||
logging.info("Aborting...")
|
logging.info("Aborting...")
|
||||||
exit(0)
|
exit(0)
|
||||||
|
else:
|
||||||
|
shutil.rmtree(
|
||||||
|
Path(Config.COMBINED_FOLDER)
|
||||||
|
) # Delete everything including the directory
|
||||||
|
Path(Config.COMBINED_FOLDER).mkdir()
|
||||||
|
|
||||||
extraction = Extract(Config)
|
extraction = Extract(Config)
|
||||||
batch = BatchNimrod(Config)
|
batch = BatchNimrod(Config)
|
||||||
@@ -130,12 +135,6 @@ if __name__ == "__main__":
|
|||||||
# 1. Extract batch (TAR -> GZ)
|
# 1. Extract batch (TAR -> GZ)
|
||||||
logging.info("Extracting tar files for batch")
|
logging.info("Extracting tar files for batch")
|
||||||
extraction.extract_tar_batch(batch_files)
|
extraction.extract_tar_batch(batch_files)
|
||||||
# Note: We do NOT run extract_gz_batch anymore. We will find GZ files and process them.
|
|
||||||
|
|
||||||
# Get list of GZ files (recursively or flat?)
|
|
||||||
# extract_tar_batch puts them in GZ_TOP_FOLDER/tar_name_without_ext
|
|
||||||
# So we need to look there.
|
|
||||||
# Ideally we know where we put them.
|
|
||||||
|
|
||||||
gz_files_to_process = []
|
gz_files_to_process = []
|
||||||
for tar_file in batch_files:
|
for tar_file in batch_files:
|
||||||
@@ -218,6 +217,8 @@ if __name__ == "__main__":
|
|||||||
elif elapsed_time < 3600:
|
elif elapsed_time < 3600:
|
||||||
elapsed_time_str = f"{int(elapsed_time // 60)}m {int(elapsed_time % 60)}s"
|
elapsed_time_str = f"{int(elapsed_time // 60)}m {int(elapsed_time % 60)}s"
|
||||||
else:
|
else:
|
||||||
elapsed_time_str = f"{int(elapsed_time // 3600)}h {int((elapsed_time % 3600) // 60)}m"
|
elapsed_time_str = (
|
||||||
|
f"{int(elapsed_time // 3600)}h {int((elapsed_time % 3600) // 60)}m"
|
||||||
|
)
|
||||||
|
|
||||||
logging.info(f"All Complete total time {elapsed_time_str}")
|
logging.info(f"All Complete total time {elapsed_time_str}")
|
||||||
|
|||||||
@@ -56,11 +56,11 @@ class GenerateTimeseries:
|
|||||||
xpp = ncols_basin * cellres_basin
|
xpp = ncols_basin * cellres_basin
|
||||||
ypp = nrows_basin * cellres_basin
|
ypp = nrows_basin * cellres_basin
|
||||||
|
|
||||||
start_col = np.floor(xp / cellres_radar)
|
start_col = np.floor(xp / cellres_radar) - 1
|
||||||
end_col = np.ceil((xpp + xp) / cellres_radar)
|
end_col = np.ceil((xpp + xp) / cellres_radar) - 1
|
||||||
|
|
||||||
start_row = np.floor(nrows_radar - ((yp + ypp) / cellres_radar))
|
start_row = np.floor(nrows_radar - ((yp + ypp) / cellres_radar)) + 1
|
||||||
end_row = np.ceil(nrows_radar - (yp / cellres_radar))
|
end_row = np.ceil(nrows_radar - (yp / cellres_radar)) + 1
|
||||||
|
|
||||||
return int(start_col), int(start_row), int(end_col), int(end_row)
|
return int(start_col), int(start_row), int(end_col), int(end_row)
|
||||||
|
|
||||||
@@ -178,43 +178,26 @@ class GenerateTimeseries:
|
|||||||
zone_name = loc[3]
|
zone_name = loc[3]
|
||||||
|
|
||||||
if zone_name not in zone_data:
|
if zone_name not in zone_data:
|
||||||
zone_data[zone_name] = {"dates": [], "values": {}}
|
zone_data[zone_name] = {"dates": set(), "values": {}}
|
||||||
|
|
||||||
zone_data[zone_name]["values"][zone_id] = results[zone_id]["values"]
|
# Create date -> value map for this grid square
|
||||||
zone_data[zone_name]["dates"].extend(results[zone_id]["dates"])
|
raw_dates = results[zone_id]["dates"]
|
||||||
|
raw_values = results[zone_id]["values"]
|
||||||
|
date_value_map = dict(zip(raw_dates, raw_values))
|
||||||
|
|
||||||
# Get unique sorted dates across all zones
|
zone_data[zone_name]["values"][zone_id] = date_value_map
|
||||||
for zone_name, data in zone_data.items():
|
zone_data[zone_name]["dates"].update(raw_dates)
|
||||||
data["dates"] = sorted(set(data["dates"]))
|
|
||||||
|
|
||||||
# Now write one CSV per zone with aligned timestamps
|
# Now write one CSV per zone with aligned timestamps
|
||||||
for zone_name, data in zone_data.items():
|
for zone_name, data in zone_data.items():
|
||||||
dates = data["dates"]
|
sorted_dates = sorted(data["dates"])
|
||||||
values_dict = data["values"]
|
values_dict = data["values"]
|
||||||
|
|
||||||
# Create aligned DataFrame
|
# Create aligned DataFrame
|
||||||
df_dict = {"datetime": dates}
|
df_dict = {"datetime": sorted_dates}
|
||||||
for grid_square, values in values_dict.items():
|
for grid_square, dv_map in values_dict.items():
|
||||||
# Align values to the common dates
|
# Align values to the common search dates using the map
|
||||||
aligned_values = []
|
aligned_values = [dv_map.get(d) for d in sorted_dates]
|
||||||
value_iter = iter(values)
|
|
||||||
date_iter = iter(dates)
|
|
||||||
|
|
||||||
current_date = next(date_iter, None)
|
|
||||||
current_value = next(value_iter, None)
|
|
||||||
|
|
||||||
for expected_date in dates:
|
|
||||||
if current_date == expected_date:
|
|
||||||
aligned_values.append(current_value)
|
|
||||||
try:
|
|
||||||
current_date = next(date_iter)
|
|
||||||
current_value = next(value_iter)
|
|
||||||
except StopIteration:
|
|
||||||
current_date = None
|
|
||||||
current_value = None
|
|
||||||
else:
|
|
||||||
aligned_values.append(None) # Missing value
|
|
||||||
|
|
||||||
df_dict[grid_square] = aligned_values
|
df_dict[grid_square] = aligned_values
|
||||||
|
|
||||||
new_df = pd.DataFrame(df_dict)
|
new_df = pd.DataFrame(df_dict)
|
||||||
|
|||||||
+2
-2
@@ -1,7 +1,7 @@
|
|||||||
[project]
|
[project]
|
||||||
name = "met-office"
|
name = "met-office"
|
||||||
version = "1.3.1"
|
version = "1.3.2"
|
||||||
description = "Convert .dat nimrod files to .asc files"
|
description = "Convert nimrod files to .csv timeseries"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
requires-python = ">=3.14"
|
requires-python = ">=3.14"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ requires-python = ">=3.14"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "met-office"
|
name = "met-office"
|
||||||
version = "1.3.1"
|
version = "1.3.2"
|
||||||
source = { virtual = "." }
|
source = { virtual = "." }
|
||||||
dependencies = [
|
dependencies = [
|
||||||
{ name = "numpy" },
|
{ name = "numpy" },
|
||||||
|
|||||||
Reference in New Issue
Block a user