Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
303cc62b56
|
|||
| 22a338f790 | |||
|
0e682aca35
|
|||
| 354f4c7fc6 | |||
| a43edb1148 | |||
| ad6b31e644 |
@@ -15,11 +15,13 @@ The project consists of a main pipeline workflow that processes multiple modules
|
|||||||
|
|
||||||
### main.py
|
### main.py
|
||||||
|
|
||||||
- Orchestrates the entire workflow pipeline
|
- **Startup Safety Check**: Scans the `COMBINED_FOLDER` at startup and warns the user if existing files are found, Deleting existing files if continue is accepted.
|
||||||
- Uncompress the packed .gz.tar files to DAT files
|
- **Batch Processing**: Processes input tar files in configurable batches to manage resource usage.
|
||||||
- Processes DAT files to ASC format
|
- **Tidy by Default**: Default settings wil delete all mid step files and keep only the original Tar files. Can be changed in config.py
|
||||||
- Generates timeseries data for specified locations
|
- **End-to-End Processing**: Extracts GZ files, processes DAT/ASC, and appends to CSV in a single thread per file.
|
||||||
- Combines grouped CSV files into consolidated datasets formatted for Infoworks ICM
|
- **Concurrency**: Uses multi-threading to process individual GZ files within a batch concurrently.
|
||||||
|
- **Cumulative Data**: Automatically appends new query results to the existing CSV files in `COMBINED_FOLDER` for each batch, ensuring no data is lost and columns are correctly aligned.
|
||||||
|
- **Dynamic ETA**: Provides a real-time estimate of completion time.
|
||||||
|
|
||||||
### extract.py
|
### extract.py
|
||||||
|
|
||||||
@@ -30,7 +32,7 @@ The project consists of a main pipeline workflow that processes multiple modules
|
|||||||
|
|
||||||
- Process multiple NIMROD dat files
|
- Process multiple NIMROD dat files
|
||||||
- Automatically extract datetime from file data
|
- Automatically extract datetime from file data
|
||||||
- Export clipped raster data to ASC format
|
- Export raster data to ASC format
|
||||||
|
|
||||||
### generate_timeseries.py
|
### generate_timeseries.py
|
||||||
|
|
||||||
@@ -73,6 +75,7 @@ The `config.py` file defines folder paths and file deletion options:
|
|||||||
- ASC_TOP_FOLDER = "./asc_files"
|
- ASC_TOP_FOLDER = "./asc_files"
|
||||||
- COMBINED_FOLDER = "./combined_files"
|
- COMBINED_FOLDER = "./combined_files"
|
||||||
- ZONE_FOLDER = "./zone_inputs"
|
- ZONE_FOLDER = "./zone_inputs"
|
||||||
|
- BATCH_SIZE = 5 (Number of tar files to process per batch)
|
||||||
|
|
||||||
Example of how the zone csv files should look:
|
Example of how the zone csv files should look:
|
||||||
|
|
||||||
|
|||||||
@@ -11,3 +11,5 @@ class Config:
|
|||||||
delete_gz_after_processing = True
|
delete_gz_after_processing = True
|
||||||
delete_dat_after_processing = True
|
delete_dat_after_processing = True
|
||||||
delete_asc_after_processing = True
|
delete_asc_after_processing = True
|
||||||
|
|
||||||
|
BATCH_SIZE = 5
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import os
|
|||||||
import csv
|
import csv
|
||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
import shutil
|
||||||
|
|
||||||
from config import Config
|
from config import Config
|
||||||
from modules import BatchNimrod, GenerateTimeseries, Extract
|
from modules import BatchNimrod, GenerateTimeseries, Extract
|
||||||
@@ -13,14 +14,40 @@ logging.basicConfig(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def process_pipeline(dat_file):
|
def process_pipeline(gz_file_path):
|
||||||
# 1. Process DAT to ASC
|
# 1. Extract GZ to DAT
|
||||||
asc_file = batch._process_single_file(dat_file)
|
gz_path = Path(gz_file_path)
|
||||||
if not asc_file:
|
# The dat file name is derived from the gz file name (removing .gz or .dat.gz)
|
||||||
|
# gz files are named like 'NAME.dat.gz' often.
|
||||||
|
dat_filename = gz_path.name.replace(".gz", "")
|
||||||
|
dat_path = Path(Config.DAT_TOP_FOLDER, dat_filename)
|
||||||
|
|
||||||
|
# Extract
|
||||||
|
try:
|
||||||
|
extraction.process_single_gz(gz_path, dat_path)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Failed to extract {gz_path}: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# 2. Extract data from ASC
|
if not dat_path.exists():
|
||||||
|
logging.error(f"DAT file not found after extraction: {dat_path}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# 2. Process DAT to ASC
|
||||||
|
# BatchNimrod._process_single_file expects just the filename, not full path
|
||||||
|
asc_file = batch._process_single_file(dat_filename)
|
||||||
|
if not asc_file:
|
||||||
|
# Cleanup failed DAT file if needed (BatchNimrod might have done it or not)
|
||||||
|
if Config.delete_dat_after_processing and dat_path.exists():
|
||||||
|
try:
|
||||||
|
os.remove(dat_path)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
return None
|
||||||
|
|
||||||
|
# 3. Extract data from ASC
|
||||||
file_results = timeseries.process_asc_file(asc_file, locations)
|
file_results = timeseries.process_asc_file(asc_file, locations)
|
||||||
|
|
||||||
return file_results
|
return file_results
|
||||||
|
|
||||||
|
|
||||||
@@ -57,33 +84,74 @@ if __name__ == "__main__":
|
|||||||
logging.info(f"Count of 1km Grids: {len(locations)}")
|
logging.info(f"Count of 1km Grids: {len(locations)}")
|
||||||
logging.info(f"Count of Zones: {len(zones)}")
|
logging.info(f"Count of Zones: {len(zones)}")
|
||||||
|
|
||||||
|
# Check for existing combined files
|
||||||
|
existing_combined = os.listdir(Config.COMBINED_FOLDER)
|
||||||
|
if existing_combined:
|
||||||
|
logging.warning("!" * 80)
|
||||||
|
logging.warning(
|
||||||
|
f"Found {len(existing_combined)} files in {Config.COMBINED_FOLDER}"
|
||||||
|
)
|
||||||
|
logging.warning(
|
||||||
|
"If you continue these WILL BE DELETED, Please make sure you have them saved."
|
||||||
|
)
|
||||||
|
logging.warning("!" * 80)
|
||||||
|
response = input("Continue? (Y/N): ").strip().lower()
|
||||||
|
if response != "y":
|
||||||
|
logging.info("Aborting...")
|
||||||
|
exit(0)
|
||||||
|
else:
|
||||||
|
shutil.rmtree(
|
||||||
|
Path(Config.COMBINED_FOLDER)
|
||||||
|
) # Delete everything including the directory
|
||||||
|
Path(Config.COMBINED_FOLDER).mkdir()
|
||||||
|
|
||||||
extraction = Extract(Config)
|
extraction = Extract(Config)
|
||||||
batch = BatchNimrod(Config)
|
batch = BatchNimrod(Config)
|
||||||
timeseries = GenerateTimeseries(Config, locations)
|
timeseries = GenerateTimeseries(Config, locations)
|
||||||
|
|
||||||
start = time.time()
|
start = time.time()
|
||||||
logging.info(
|
logging.info(
|
||||||
"Starting interleaved processing of DAT files and Timeseries generation"
|
"Starting interleaved processing of GZ files -> DAT -> ASC -> Timeseries"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Initialize results structure
|
# Get list of all tar files
|
||||||
|
all_tar_files = [f for f in os.listdir(Config.TAR_TOP_FOLDER) if f.endswith(".tar")]
|
||||||
|
all_tar_files.sort()
|
||||||
|
total_tars = len(all_tar_files)
|
||||||
|
files_per_tar = 288
|
||||||
|
estimated_total_files = total_tars * files_per_tar
|
||||||
|
logging.info(f"Found {total_tars} tar files to process")
|
||||||
|
|
||||||
|
# Process in batches
|
||||||
|
for i in range(0, total_tars, Config.BATCH_SIZE):
|
||||||
|
batch_files = all_tar_files[i : i + Config.BATCH_SIZE]
|
||||||
|
logging.info(
|
||||||
|
f"Processing batch {i // Config.BATCH_SIZE + 1}: {len(batch_files)} tar files"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Initialize results structure for this batch
|
||||||
results = {loc[0]: {"dates": [], "values": []} for loc in locations}
|
results = {loc[0]: {"dates": [], "values": []} for loc in locations}
|
||||||
|
|
||||||
logging.info("Extracting tar and gz files")
|
# 1. Extract batch (TAR -> GZ)
|
||||||
extraction.run_extraction()
|
logging.info("Extracting tar files for batch")
|
||||||
|
extraction.extract_tar_batch(batch_files)
|
||||||
|
|
||||||
# Get list of DAT files
|
gz_files_to_process = []
|
||||||
dat_files = [
|
for tar_file in batch_files:
|
||||||
f for f in os.listdir(Path(Config.DAT_TOP_FOLDER)) if not f.startswith(".")
|
extract_folder = Path(Config.GZ_TOP_FOLDER, tar_file.replace(".tar", ""))
|
||||||
]
|
if extract_folder.exists():
|
||||||
total_files = len(dat_files)
|
for root, _, files in os.walk(extract_folder):
|
||||||
|
for file in files:
|
||||||
|
if file.endswith(".gz"):
|
||||||
|
gz_files_to_process.append(Path(root, file))
|
||||||
|
|
||||||
logging.info(f"Processing {total_files} files concurrently...")
|
total_files = len(gz_files_to_process)
|
||||||
|
logging.info(f"Found {total_files} GZ files to process concurrently...")
|
||||||
|
|
||||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||||
future_to_file = {
|
future_to_file = {
|
||||||
executor.submit(process_pipeline, dat_file): dat_file
|
executor.submit(process_pipeline, gz_file): gz_file
|
||||||
for dat_file in dat_files
|
for gz_file in gz_files_to_process
|
||||||
}
|
}
|
||||||
|
|
||||||
completed_count = 0
|
completed_count = 0
|
||||||
@@ -98,23 +166,59 @@ if __name__ == "__main__":
|
|||||||
|
|
||||||
completed_count += 1
|
completed_count += 1
|
||||||
if completed_count % 100 == 0:
|
if completed_count % 100 == 0:
|
||||||
|
files_processed_previous = i * files_per_tar
|
||||||
|
files_processed_so_far = (
|
||||||
|
files_processed_previous + completed_count
|
||||||
|
)
|
||||||
|
|
||||||
elapsed_time = time.time() - start
|
elapsed_time = time.time() - start
|
||||||
files_per_minute = (completed_count / elapsed_time) * 60
|
rate_per_second = files_processed_so_far / elapsed_time
|
||||||
remaining_files = total_files - completed_count
|
|
||||||
eta_minutes = remaining_files / (files_per_minute / 60) / 60
|
remaining_files = estimated_total_files - files_processed_so_far
|
||||||
logging.info(f"""Processed {completed_count} out of {total_files} files.
|
|
||||||
Speed: {files_per_minute:.2f} files/min. ETA: {eta_minutes:.2f} minutes""")
|
if rate_per_second > 0:
|
||||||
|
eta_seconds = remaining_files / rate_per_second
|
||||||
|
|
||||||
|
if eta_seconds < 60:
|
||||||
|
eta_str = f"{int(eta_seconds)}s"
|
||||||
|
elif eta_seconds < 3600:
|
||||||
|
eta_str = f"{int(eta_seconds // 60)}m {int(eta_seconds % 60)}s"
|
||||||
|
else:
|
||||||
|
eta_str = f"{int(eta_seconds // 3600)}h {int((eta_seconds % 3600) // 60)}m"
|
||||||
|
else:
|
||||||
|
eta_str = "Unknown"
|
||||||
|
|
||||||
|
logging.info(f"""Progress: {files_processed_so_far}/{estimated_total_files} files ({files_processed_so_far / estimated_total_files * 100:.1f}%)
|
||||||
|
Speed: {rate_per_second * 60:.2f} files/min. ETA: {eta_str}""")
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
logging.warning("KeyboardInterrupt received. Cancelling pending tasks...")
|
logging.warning(
|
||||||
|
"KeyboardInterrupt received. Cancelling pending tasks..."
|
||||||
|
)
|
||||||
executor.shutdown(wait=False, cancel_futures=True)
|
executor.shutdown(wait=False, cancel_futures=True)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
elapsed_time = time.time() - start
|
logging.info("Appending batch results to CSV files...")
|
||||||
logging.info(f"Interleaved processing completed in {elapsed_time:.2f} seconds")
|
timeseries.append_results_to_csv(results, locations)
|
||||||
|
|
||||||
logging.info("Writing CSV files...")
|
# Cleanup GZ folders for this batch
|
||||||
timeseries.write_results_to_csv(results, locations)
|
# We loop through batch_files again to delete the folders we created
|
||||||
|
for tar_file in batch_files:
|
||||||
|
extract_folder = Path(Config.GZ_TOP_FOLDER, tar_file.replace(".tar", ""))
|
||||||
|
if extract_folder.exists():
|
||||||
|
try:
|
||||||
|
shutil.rmtree(extract_folder)
|
||||||
|
except OSError as e:
|
||||||
|
logging.warning(f"Failed to remove GZ folder {extract_folder}: {e}")
|
||||||
end = time.time()
|
end = time.time()
|
||||||
elapsed_time = end - start
|
elapsed_time = end - start
|
||||||
|
|
||||||
logging.info(f"All Complete total time {elapsed_time:.2f} seconds")
|
if elapsed_time < 60:
|
||||||
|
elapsed_time_str = f"{int(elapsed_time)}s"
|
||||||
|
elif elapsed_time < 3600:
|
||||||
|
elapsed_time_str = f"{int(elapsed_time // 60)}m {int(elapsed_time % 60)}s"
|
||||||
|
else:
|
||||||
|
elapsed_time_str = (
|
||||||
|
f"{int(elapsed_time // 3600)}h {int((elapsed_time % 3600) // 60)}m"
|
||||||
|
)
|
||||||
|
|
||||||
|
logging.info(f"All Complete total time {elapsed_time_str}")
|
||||||
|
|||||||
+30
-20
@@ -3,6 +3,7 @@ import gzip
|
|||||||
import shutil
|
import shutil
|
||||||
import os
|
import os
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
import concurrent.futures
|
||||||
|
|
||||||
|
|
||||||
class Extract:
|
class Extract:
|
||||||
@@ -10,12 +11,8 @@ class Extract:
|
|||||||
def __init__(self, Config):
|
def __init__(self, Config):
|
||||||
self.config = Config
|
self.config = Config
|
||||||
|
|
||||||
def _extract_tar(self):
|
def extract_tar_batch(self, tar_files):
|
||||||
for tar_file in os.listdir(self.config.TAR_TOP_FOLDER):
|
for tar_file in tar_files:
|
||||||
# only handle .tar files
|
|
||||||
if not tar_file.endswith(".tar"):
|
|
||||||
pass
|
|
||||||
|
|
||||||
tar_path = Path(self.config.TAR_TOP_FOLDER, tar_file)
|
tar_path = Path(self.config.TAR_TOP_FOLDER, tar_file)
|
||||||
|
|
||||||
# Create a folder for extracted tar contents
|
# Create a folder for extracted tar contents
|
||||||
@@ -31,32 +28,45 @@ class Extract:
|
|||||||
if self.config.delete_tar_after_processing:
|
if self.config.delete_tar_after_processing:
|
||||||
os.remove(tar_path)
|
os.remove(tar_path)
|
||||||
|
|
||||||
def _extract_gz(self):
|
def process_single_gz(self, gz_path, dat_path):
|
||||||
for root, _, files in os.walk(self.config.GZ_TOP_FOLDER):
|
try:
|
||||||
for file in files:
|
|
||||||
# only handle .gz files
|
|
||||||
if not file.endswith(".dat.gz"):
|
|
||||||
pass # adjust if extension differs
|
|
||||||
gz_path = Path(root, file)
|
|
||||||
dat_path = Path(self.config.DAT_TOP_FOLDER, file.replace(".gz", ""))
|
|
||||||
|
|
||||||
# Unzip .gz file
|
|
||||||
with gzip.open(gz_path, "rb") as f_in:
|
with gzip.open(gz_path, "rb") as f_in:
|
||||||
with open(dat_path, "wb") as f_out:
|
with open(dat_path, "wb") as f_out:
|
||||||
shutil.copyfileobj(f_in, f_out)
|
shutil.copyfileobj(f_in, f_out)
|
||||||
|
|
||||||
if self.config.delete_gz_after_processing:
|
if self.config.delete_gz_after_processing:
|
||||||
os.remove(gz_path)
|
os.remove(gz_path)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error extracting {gz_path}: {e}")
|
||||||
|
|
||||||
|
def extract_gz_batch(self):
|
||||||
|
gz_tasks = []
|
||||||
|
for root, _, files in os.walk(self.config.GZ_TOP_FOLDER):
|
||||||
|
for file in files:
|
||||||
|
# only handle .gz files
|
||||||
|
if not file.endswith(".dat.gz"):
|
||||||
|
continue
|
||||||
|
|
||||||
|
gz_path = Path(root, file)
|
||||||
|
dat_path = Path(self.config.DAT_TOP_FOLDER, file.replace(".gz", ""))
|
||||||
|
gz_tasks.append((gz_path, dat_path))
|
||||||
|
|
||||||
|
print(f"Extracting {len(gz_tasks)} gz files concurrently...")
|
||||||
|
|
||||||
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||||
|
futures = [
|
||||||
|
executor.submit(self.process_single_gz, gz_path, dat_path)
|
||||||
|
for gz_path, dat_path in gz_tasks
|
||||||
|
]
|
||||||
|
concurrent.futures.wait(futures)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
shutil.rmtree(self.config.GZ_TOP_FOLDER)
|
shutil.rmtree(self.config.GZ_TOP_FOLDER)
|
||||||
|
# Recreate the folder for the next batch
|
||||||
|
Path(self.config.GZ_TOP_FOLDER).mkdir(exist_ok=True)
|
||||||
print("processing complete and GZ files deleted")
|
print("processing complete and GZ files deleted")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(str(e))
|
print(str(e))
|
||||||
print(
|
print(
|
||||||
f"processing complete but GZ folder delete failed. Please delete manually ({self.config.GZ_TOP_FOLDER})"
|
f"processing complete but GZ folder delete failed. Please delete manually ({self.config.GZ_TOP_FOLDER})"
|
||||||
)
|
)
|
||||||
|
|
||||||
def run_extraction(self):
|
|
||||||
self._extract_tar()
|
|
||||||
self._extract_gz()
|
|
||||||
|
|||||||
@@ -56,11 +56,11 @@ class GenerateTimeseries:
|
|||||||
xpp = ncols_basin * cellres_basin
|
xpp = ncols_basin * cellres_basin
|
||||||
ypp = nrows_basin * cellres_basin
|
ypp = nrows_basin * cellres_basin
|
||||||
|
|
||||||
start_col = np.floor(xp / cellres_radar)
|
start_col = np.floor(xp / cellres_radar) - 1
|
||||||
end_col = np.ceil((xpp + xp) / cellres_radar)
|
end_col = np.ceil((xpp + xp) / cellres_radar) - 1
|
||||||
|
|
||||||
start_row = np.floor(nrows_radar - ((yp + ypp) / cellres_radar))
|
start_row = np.floor(nrows_radar - ((yp + ypp) / cellres_radar)) + 1
|
||||||
end_row = np.ceil(nrows_radar - (yp / cellres_radar))
|
end_row = np.ceil(nrows_radar - (yp / cellres_radar)) + 1
|
||||||
|
|
||||||
return int(start_col), int(start_row), int(end_col), int(end_row)
|
return int(start_col), int(start_row), int(end_col), int(end_row)
|
||||||
|
|
||||||
@@ -164,8 +164,8 @@ class GenerateTimeseries:
|
|||||||
executor.shutdown(wait=False, cancel_futures=True)
|
executor.shutdown(wait=False, cancel_futures=True)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def write_results_to_csv(self, results, locations):
|
def append_results_to_csv(self, results, locations):
|
||||||
"""Write extracted data to CSV files for each zone.
|
"""Append extracted data to CSV files for each zone.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
results (dict): Aggregated results {zone_id: {'dates': [], 'values': []}}
|
results (dict): Aggregated results {zone_id: {'dates': [], 'values': []}}
|
||||||
@@ -178,58 +178,63 @@ class GenerateTimeseries:
|
|||||||
zone_name = loc[3]
|
zone_name = loc[3]
|
||||||
|
|
||||||
if zone_name not in zone_data:
|
if zone_name not in zone_data:
|
||||||
zone_data[zone_name] = {"dates": [], "values": {}}
|
zone_data[zone_name] = {"dates": set(), "values": {}}
|
||||||
|
|
||||||
zone_data[zone_name]["values"][zone_id] = results[zone_id]["values"]
|
# Create date -> value map for this grid square
|
||||||
zone_data[zone_name]["dates"].extend(results[zone_id]["dates"])
|
raw_dates = results[zone_id]["dates"]
|
||||||
|
raw_values = results[zone_id]["values"]
|
||||||
|
date_value_map = dict(zip(raw_dates, raw_values))
|
||||||
|
|
||||||
# Get unique sorted dates across all zones
|
zone_data[zone_name]["values"][zone_id] = date_value_map
|
||||||
for zone_name, data in zone_data.items():
|
zone_data[zone_name]["dates"].update(raw_dates)
|
||||||
data["dates"] = sorted(set(data["dates"]))
|
|
||||||
|
|
||||||
# Now write one CSV per zone with aligned timestamps
|
# Now write one CSV per zone with aligned timestamps
|
||||||
for zone_name, data in zone_data.items():
|
for zone_name, data in zone_data.items():
|
||||||
dates = data["dates"]
|
sorted_dates = sorted(data["dates"])
|
||||||
values_dict = data["values"]
|
values_dict = data["values"]
|
||||||
|
|
||||||
# Create aligned DataFrame
|
# Create aligned DataFrame
|
||||||
df_dict = {"datetime": dates}
|
df_dict = {"datetime": sorted_dates}
|
||||||
for grid_square, values in values_dict.items():
|
for grid_square, dv_map in values_dict.items():
|
||||||
# Align values to the common dates
|
# Align values to the common search dates using the map
|
||||||
aligned_values = []
|
aligned_values = [dv_map.get(d) for d in sorted_dates]
|
||||||
value_iter = iter(values)
|
|
||||||
date_iter = iter(dates)
|
|
||||||
|
|
||||||
current_date = next(date_iter, None)
|
|
||||||
current_value = next(value_iter, None)
|
|
||||||
|
|
||||||
for expected_date in dates:
|
|
||||||
if current_date == expected_date:
|
|
||||||
aligned_values.append(current_value)
|
|
||||||
try:
|
|
||||||
current_date = next(date_iter)
|
|
||||||
current_value = next(value_iter)
|
|
||||||
except StopIteration:
|
|
||||||
current_date = None
|
|
||||||
current_value = None
|
|
||||||
else:
|
|
||||||
aligned_values.append(None) # Missing value
|
|
||||||
|
|
||||||
df_dict[grid_square] = aligned_values
|
df_dict[grid_square] = aligned_values
|
||||||
|
|
||||||
df = pd.DataFrame(df_dict)
|
new_df = pd.DataFrame(df_dict)
|
||||||
|
|
||||||
# Sort by datetime (already sorted)
|
# Format datetime column in new_df
|
||||||
sorted_df = df.sort("datetime")
|
new_df = new_df.with_columns(
|
||||||
|
|
||||||
# Format datetime column
|
|
||||||
sorted_df = sorted_df.with_columns(
|
|
||||||
pd.col("datetime").dt.strftime("%Y-%m-%d %H:%M:%S")
|
pd.col("datetime").dt.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
)
|
)
|
||||||
|
|
||||||
output_path = (
|
output_path = (
|
||||||
Path(self.config.COMBINED_FOLDER) / f"{zone_name}_timeseries_data.csv"
|
Path(self.config.COMBINED_FOLDER) / f"{zone_name}_timeseries_data.csv"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if output_path.exists():
|
||||||
|
# Load existing CSV
|
||||||
|
existing_df = pd.read_csv(output_path)
|
||||||
|
|
||||||
|
# Reorder new_df to match existing_df
|
||||||
|
new_df = new_df.select(existing_df.columns)
|
||||||
|
|
||||||
|
# Concatenate
|
||||||
|
combined_df = pd.concat([existing_df, new_df])
|
||||||
|
# Sort by datetime
|
||||||
|
combined_df = combined_df.sort("datetime")
|
||||||
|
# Write back
|
||||||
|
combined_df.write_csv(output_path, float_precision=4)
|
||||||
|
else:
|
||||||
|
# Write new CSV
|
||||||
|
# Sort columns to ensure deterministic order (datetime first)
|
||||||
|
cols = new_df.columns
|
||||||
|
cols.remove("datetime")
|
||||||
|
cols.sort()
|
||||||
|
sorted_cols = ["datetime"] + cols
|
||||||
|
new_df = new_df.select(sorted_cols)
|
||||||
|
|
||||||
|
# Sort by datetime (already sorted but good practice)
|
||||||
|
sorted_df = new_df.sort("datetime")
|
||||||
sorted_df.write_csv(output_path, float_precision=4)
|
sorted_df.write_csv(output_path, float_precision=4)
|
||||||
|
|
||||||
logging.info("All CSV files written.")
|
logging.info("All CSV files updated.")
|
||||||
|
|||||||
+2
-2
@@ -1,7 +1,7 @@
|
|||||||
[project]
|
[project]
|
||||||
name = "met-office"
|
name = "met-office"
|
||||||
version = "1.2.0"
|
version = "1.3.3"
|
||||||
description = "Convert .dat nimrod files to .asc files"
|
description = "Convert nimrod files to .csv timeseries"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
requires-python = ">=3.14"
|
requires-python = ">=3.14"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ requires-python = ">=3.14"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "met-office"
|
name = "met-office"
|
||||||
version = "1.2.0"
|
version = "1.3.3"
|
||||||
source = { virtual = "." }
|
source = { virtual = "." }
|
||||||
dependencies = [
|
dependencies = [
|
||||||
{ name = "numpy" },
|
{ name = "numpy" },
|
||||||
|
|||||||
Reference in New Issue
Block a user