Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
9aaf8a5e88
|
|||
| a43edb1148 | |||
| ad6b31e644 | |||
|
d386317957
|
|||
|
1c6418e044
|
|||
|
85deee7843
|
@@ -10,6 +10,8 @@ wheels/
|
|||||||
.venv
|
.venv
|
||||||
|
|
||||||
dat_other/*
|
dat_other/*
|
||||||
|
tar_files/*
|
||||||
|
gz_files/*
|
||||||
dat_files/*
|
dat_files/*
|
||||||
asc_files/*
|
asc_files/*
|
||||||
csv_files/*
|
csv_files/*
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
# UK Met Office Rain Radar NIMROD Data Processor
|
# UK Met Office Rain Radar NIMROD Data Processor
|
||||||
|
|
||||||
This project provides tools for processing UK Met Office Rain Radar NIMROD image files. It allows extraction of raster data from NIMROD .dat format files and conversion to ESRI ASCII (.asc) format. It also allows the creation of timeseries data from the ASC files.
|
This project provides tools for processing UK Met Office Rain Radar NIMROD image files. It allows extraction of raster data from NIMROD .dat format files and conversion to ESRI ASCII (.asc) format. It also allows the creation of timeseries data from the ASC files, formatted for Infoworks ICM.
|
||||||
|
|
||||||
## Overview
|
## Overview
|
||||||
|
|
||||||
@@ -9,16 +9,23 @@ The project consists of a main pipeline workflow that processes multiple modules
|
|||||||
- `main.py`: Main pipeline orchestrator that calls on the modules as needed
|
- `main.py`: Main pipeline orchestrator that calls on the modules as needed
|
||||||
- `batch_nimrod.py`: Module for batch processing multiple NIMROD files with configurable bounding boxes
|
- `batch_nimrod.py`: Module for batch processing multiple NIMROD files with configurable bounding boxes
|
||||||
- `generate_timeseries.py`: Module for extracting cropped rain data and creating rainfall timeseries
|
- `generate_timeseries.py`: Module for extracting cropped rain data and creating rainfall timeseries
|
||||||
- `combine_timeseries.py`: Module for combining grouped timeseries CSVs into consolidated datasets
|
- `extract.py`: Module for extracting the dat files from the .gz.tar files that are downloaded from source
|
||||||
|
|
||||||
## Features
|
## Features
|
||||||
|
|
||||||
### main.py
|
### main.py
|
||||||
|
|
||||||
- Orchestrates the entire workflow pipeline
|
- **Startup Safety Check**: Scans the `COMBINED_FOLDER` at startup and warns the user if existing files are found, Deleting existing files if continue is accepted.
|
||||||
- Processes DAT files to ASC format
|
- **Batch Processing**: Processes input tar files in configurable batches to manage resource usage.
|
||||||
- Generates timeseries data for specified locations
|
- **End-to-End Processing**: Extracts GZ files, processes DAT/ASC, and appends to CSV in a single thread per file.
|
||||||
- Combines grouped CSV files into consolidated datasets
|
- **Concurrency**: Uses multi-threading to process individual GZ files within a batch concurrently.
|
||||||
|
- **Cumulative Data**: Automatically appends new query results to the existing CSV files in `COMBINED_FOLDER` for each batch, ensuring no data is lost and columns are correctly aligned.
|
||||||
|
- **Dynamic ETA**: Provides a real-time estimate of completion time.
|
||||||
|
|
||||||
|
### extract.py
|
||||||
|
|
||||||
|
- Converts all .gz.tar files first to 288 (1 day) of .gz files
|
||||||
|
- Converts all .gz files to .dat files ready for processing.
|
||||||
|
|
||||||
### batch_nimrod.py
|
### batch_nimrod.py
|
||||||
|
|
||||||
@@ -31,10 +38,6 @@ The project consists of a main pipeline workflow that processes multiple modules
|
|||||||
- Extract cropped rain data based on specified locations
|
- Extract cropped rain data based on specified locations
|
||||||
- Create rainfall timeseries CSVs for each location
|
- Create rainfall timeseries CSVs for each location
|
||||||
- Parse datetime from filename and create proper datetime index
|
- Parse datetime from filename and create proper datetime index
|
||||||
|
|
||||||
### combine_timeseries.py
|
|
||||||
|
|
||||||
- Combine multiple timeseries CSV files into grouped datasets
|
|
||||||
- Group locations by specified output groups
|
- Group locations by specified output groups
|
||||||
- Create consolidated CSV files for each group
|
- Create consolidated CSV files for each group
|
||||||
|
|
||||||
@@ -49,24 +52,29 @@ It is recommended to use UV for environment and package handling.
|
|||||||
|
|
||||||
1. Ensure all required packages are installed `uv sync`
|
1. Ensure all required packages are installed `uv sync`
|
||||||
1. Adjust the config.py file to match your needs.
|
1. Adjust the config.py file to match your needs.
|
||||||
1. Ensure your .dat files are in the DAT_TOP_FOLDER (as per config location)
|
1. Ensure your .gz.tar files are in the TAR_TOP_FOLDER (as per config location)
|
||||||
1. Ensure your zone csv files are in the ZONE_FOLDER (as per config location)
|
1. Ensure your zone csv files are in the ZONE_FOLDER (as per config location)
|
||||||
1. RunMain Pipeline `uv run main.py` Note that you will have to set your environment variable `PYTHON_GIL=0` first
|
1. RunMain Pipeline `uv run main.py` Note that you will have to set your environment variable `PYTHON_GIL=0` first
|
||||||
1. find the output in the COMBINED_FOLDER (as per config location)
|
1. find the output in the COMBINED_FOLDER (as per config location)
|
||||||
|
|
||||||
The main pipeline will:
|
The main pipeline will:
|
||||||
|
|
||||||
1. Process DAT files to ASC format if needed
|
1. Uncompress the .gz.tar files ready for processing
|
||||||
|
1. Process DAT files to ASC format
|
||||||
1. Generate timeseries data for specified locations
|
1. Generate timeseries data for specified locations
|
||||||
1. Combine grouped CSV files into consolidated datasets
|
1. Combine grouped locations into consolidated datasets
|
||||||
|
|
||||||
## Configuration
|
## Configuration
|
||||||
|
|
||||||
The `config.py` file defines folder paths:
|
The `config.py` file defines folder paths and file deletion options:
|
||||||
|
|
||||||
- DAT_TOP_FOLDER: "./dat_files"
|
- TAR_TOP_FOLDER = "./tar_files"
|
||||||
- ASC_TOP_FOLDER: "./asc_files"
|
- GZ_TOP_FOLDER = "./gz_files"
|
||||||
- COMBINED_FOLDER: "./combined_files"
|
- DAT_TOP_FOLDER = "./dat_files"
|
||||||
|
- ASC_TOP_FOLDER = "./asc_files"
|
||||||
|
- COMBINED_FOLDER = "./combined_files"
|
||||||
|
- ZONE_FOLDER = "./zone_inputs"
|
||||||
|
- BATCH_SIZE = 5 (Number of tar files to process per batch)
|
||||||
|
|
||||||
Example of how the zone csv files should look:
|
Example of how the zone csv files should look:
|
||||||
|
|
||||||
|
|||||||
@@ -1,8 +1,15 @@
|
|||||||
class Config:
|
class Config:
|
||||||
|
TAR_TOP_FOLDER = "./tar_files"
|
||||||
|
GZ_TOP_FOLDER = "./gz_files"
|
||||||
DAT_TOP_FOLDER = "./dat_files"
|
DAT_TOP_FOLDER = "./dat_files"
|
||||||
ASC_TOP_FOLDER = "./asc_files"
|
ASC_TOP_FOLDER = "./asc_files"
|
||||||
COMBINED_FOLDER = "./combined_files"
|
COMBINED_FOLDER = "./combined_files"
|
||||||
|
|
||||||
ZONE_FOLDER = "./zone_inputs"
|
ZONE_FOLDER = "./zone_inputs"
|
||||||
|
|
||||||
delete_dat_after_processing = False
|
delete_tar_after_processing = False
|
||||||
|
delete_gz_after_processing = True
|
||||||
|
delete_dat_after_processing = True
|
||||||
delete_asc_after_processing = True
|
delete_asc_after_processing = True
|
||||||
|
|
||||||
|
BATCH_SIZE = 5
|
||||||
|
|||||||
@@ -4,17 +4,67 @@ import os
|
|||||||
import csv
|
import csv
|
||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
import shutil
|
||||||
|
|
||||||
from config import Config
|
from config import Config
|
||||||
from modules import BatchNimrod, GenerateTimeseries
|
from modules import BatchNimrod, GenerateTimeseries, Extract
|
||||||
|
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def process_pipeline(gz_file_path):
|
||||||
|
# 1. Extract GZ to DAT
|
||||||
|
gz_path = Path(gz_file_path)
|
||||||
|
# The dat file name is derived from the gz file name (removing .gz or .dat.gz)
|
||||||
|
# gz files are named like 'NAME.dat.gz' often.
|
||||||
|
dat_filename = gz_path.name.replace(".gz", "")
|
||||||
|
dat_path = Path(Config.DAT_TOP_FOLDER, dat_filename)
|
||||||
|
|
||||||
|
# Extract
|
||||||
|
try:
|
||||||
|
extraction.process_single_gz(gz_path, dat_path)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Failed to extract {gz_path}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
if not dat_path.exists():
|
||||||
|
logging.error(f"DAT file not found after extraction: {dat_path}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# 2. Process DAT to ASC
|
||||||
|
# BatchNimrod._process_single_file expects just the filename, not full path
|
||||||
|
asc_file = batch._process_single_file(dat_filename)
|
||||||
|
if not asc_file:
|
||||||
|
# Cleanup failed DAT file if needed (BatchNimrod might have done it or not)
|
||||||
|
if Config.delete_dat_after_processing and dat_path.exists():
|
||||||
|
try:
|
||||||
|
os.remove(dat_path)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
return None
|
||||||
|
|
||||||
|
# 3. Extract data from ASC
|
||||||
|
file_results = timeseries.process_asc_file(asc_file, locations)
|
||||||
|
|
||||||
|
return file_results
|
||||||
|
|
||||||
|
|
||||||
|
def initialise_folders():
|
||||||
|
folder_list = [
|
||||||
|
Config.ASC_TOP_FOLDER,
|
||||||
|
Config.COMBINED_FOLDER,
|
||||||
|
Config.GZ_TOP_FOLDER,
|
||||||
|
Config.DAT_TOP_FOLDER,
|
||||||
|
Config.TAR_TOP_FOLDER,
|
||||||
|
]
|
||||||
|
for path in folder_list:
|
||||||
|
Path(path).mkdir(exist_ok=True)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
os.makedirs(Path(Config.ASC_TOP_FOLDER), exist_ok=True)
|
initialise_folders()
|
||||||
os.makedirs(Path(Config.COMBINED_FOLDER), exist_ok=True)
|
|
||||||
|
|
||||||
locations = []
|
locations = []
|
||||||
zones = set()
|
zones = set()
|
||||||
@@ -34,39 +84,72 @@ if __name__ == "__main__":
|
|||||||
logging.info(f"Count of 1km Grids: {len(locations)}")
|
logging.info(f"Count of 1km Grids: {len(locations)}")
|
||||||
logging.info(f"Count of Zones: {len(zones)}")
|
logging.info(f"Count of Zones: {len(zones)}")
|
||||||
|
|
||||||
|
# Check for existing combined files
|
||||||
|
existing_combined = os.listdir(Config.COMBINED_FOLDER)
|
||||||
|
if existing_combined:
|
||||||
|
logging.warning("!" * 80)
|
||||||
|
logging.warning(
|
||||||
|
f"Found {len(existing_combined)} files in {Config.COMBINED_FOLDER}"
|
||||||
|
)
|
||||||
|
logging.warning(
|
||||||
|
"If you continue these WILL BE DELETED, Please make sure you have them saved."
|
||||||
|
)
|
||||||
|
logging.warning("!" * 80)
|
||||||
|
response = input("Continue? (Y/N): ").strip().lower()
|
||||||
|
if response != "y":
|
||||||
|
logging.info("Aborting...")
|
||||||
|
exit(0)
|
||||||
|
else:
|
||||||
|
shutil.rmtree(Path(Config.COMBINED_FOLDER)) # Delete everything including the directory
|
||||||
|
Path(Config.COMBINED_FOLDER).mkdir()
|
||||||
|
|
||||||
|
extraction = Extract(Config)
|
||||||
batch = BatchNimrod(Config)
|
batch = BatchNimrod(Config)
|
||||||
timeseries = GenerateTimeseries(Config, locations)
|
timeseries = GenerateTimeseries(Config, locations)
|
||||||
|
|
||||||
start = time.time()
|
start = time.time()
|
||||||
logging.info(
|
logging.info(
|
||||||
"Starting interleaved processing of DAT files and Timeseries generation"
|
"Starting interleaved processing of GZ files -> DAT -> ASC -> Timeseries"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Initialize results structure
|
# Get list of all tar files
|
||||||
|
all_tar_files = [f for f in os.listdir(Config.TAR_TOP_FOLDER) if f.endswith(".tar")]
|
||||||
|
all_tar_files.sort()
|
||||||
|
total_tars = len(all_tar_files)
|
||||||
|
files_per_tar = 288
|
||||||
|
estimated_total_files = total_tars * files_per_tar
|
||||||
|
logging.info(f"Found {total_tars} tar files to process")
|
||||||
|
|
||||||
|
# Process in batches
|
||||||
|
for i in range(0, total_tars, Config.BATCH_SIZE):
|
||||||
|
batch_files = all_tar_files[i : i + Config.BATCH_SIZE]
|
||||||
|
logging.info(
|
||||||
|
f"Processing batch {i // Config.BATCH_SIZE + 1}: {len(batch_files)} tar files"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Initialize results structure for this batch
|
||||||
results = {loc[0]: {"dates": [], "values": []} for loc in locations}
|
results = {loc[0]: {"dates": [], "values": []} for loc in locations}
|
||||||
|
|
||||||
def process_pipeline(dat_file):
|
# 1. Extract batch (TAR -> GZ)
|
||||||
# 1. Process DAT to ASC
|
logging.info("Extracting tar files for batch")
|
||||||
asc_file = batch._process_single_file(dat_file)
|
extraction.extract_tar_batch(batch_files)
|
||||||
if not asc_file:
|
|
||||||
return None
|
|
||||||
|
|
||||||
# 2. Extract data from ASC
|
gz_files_to_process = []
|
||||||
file_results = timeseries.process_asc_file(asc_file, locations)
|
for tar_file in batch_files:
|
||||||
return file_results
|
extract_folder = Path(Config.GZ_TOP_FOLDER, tar_file.replace(".tar", ""))
|
||||||
|
if extract_folder.exists():
|
||||||
|
for root, _, files in os.walk(extract_folder):
|
||||||
|
for file in files:
|
||||||
|
if file.endswith(".gz"):
|
||||||
|
gz_files_to_process.append(Path(root, file))
|
||||||
|
|
||||||
# Get list of DAT files
|
total_files = len(gz_files_to_process)
|
||||||
dat_files = [
|
logging.info(f"Found {total_files} GZ files to process concurrently...")
|
||||||
f for f in os.listdir(Path(Config.DAT_TOP_FOLDER)) if not f.startswith(".")
|
|
||||||
]
|
|
||||||
total_files = len(dat_files)
|
|
||||||
|
|
||||||
logging.info(f"Processing {total_files} files concurrently...")
|
|
||||||
|
|
||||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||||
future_to_file = {
|
future_to_file = {
|
||||||
executor.submit(process_pipeline, dat_file): dat_file
|
executor.submit(process_pipeline, gz_file): gz_file
|
||||||
for dat_file in dat_files
|
for gz_file in gz_files_to_process
|
||||||
}
|
}
|
||||||
|
|
||||||
completed_count = 0
|
completed_count = 0
|
||||||
@@ -81,23 +164,57 @@ if __name__ == "__main__":
|
|||||||
|
|
||||||
completed_count += 1
|
completed_count += 1
|
||||||
if completed_count % 100 == 0:
|
if completed_count % 100 == 0:
|
||||||
|
files_processed_previous = i * files_per_tar
|
||||||
|
files_processed_so_far = (
|
||||||
|
files_processed_previous + completed_count
|
||||||
|
)
|
||||||
|
|
||||||
elapsed_time = time.time() - start
|
elapsed_time = time.time() - start
|
||||||
files_per_minute = (completed_count / elapsed_time) * 60
|
rate_per_second = files_processed_so_far / elapsed_time
|
||||||
remaining_files = total_files - completed_count
|
|
||||||
eta_minutes = remaining_files / (files_per_minute / 60) / 60
|
remaining_files = estimated_total_files - files_processed_so_far
|
||||||
logging.info(f"""Processed {completed_count} out of {total_files} files.
|
|
||||||
Speed: {files_per_minute:.2f} files/min. ETA: {eta_minutes:.2f} minutes""")
|
if rate_per_second > 0:
|
||||||
|
eta_seconds = remaining_files / rate_per_second
|
||||||
|
|
||||||
|
if eta_seconds < 60:
|
||||||
|
eta_str = f"{int(eta_seconds)}s"
|
||||||
|
elif eta_seconds < 3600:
|
||||||
|
eta_str = f"{int(eta_seconds // 60)}m {int(eta_seconds % 60)}s"
|
||||||
|
else:
|
||||||
|
eta_str = f"{int(eta_seconds // 3600)}h {int((eta_seconds % 3600) // 60)}m"
|
||||||
|
else:
|
||||||
|
eta_str = "Unknown"
|
||||||
|
|
||||||
|
logging.info(f"""Progress: {files_processed_so_far}/{estimated_total_files} files ({files_processed_so_far / estimated_total_files * 100:.1f}%)
|
||||||
|
Speed: {rate_per_second * 60:.2f} files/min. ETA: {eta_str}""")
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
logging.warning("KeyboardInterrupt received. Cancelling pending tasks...")
|
logging.warning(
|
||||||
|
"KeyboardInterrupt received. Cancelling pending tasks..."
|
||||||
|
)
|
||||||
executor.shutdown(wait=False, cancel_futures=True)
|
executor.shutdown(wait=False, cancel_futures=True)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
elapsed_time = time.time() - start
|
logging.info("Appending batch results to CSV files...")
|
||||||
logging.info(f"Interleaved processing completed in {elapsed_time:.2f} seconds")
|
timeseries.append_results_to_csv(results, locations)
|
||||||
|
|
||||||
logging.info("Writing CSV files...")
|
# Cleanup GZ folders for this batch
|
||||||
timeseries.write_results_to_csv(results, locations)
|
# We loop through batch_files again to delete the folders we created
|
||||||
|
for tar_file in batch_files:
|
||||||
|
extract_folder = Path(Config.GZ_TOP_FOLDER, tar_file.replace(".tar", ""))
|
||||||
|
if extract_folder.exists():
|
||||||
|
try:
|
||||||
|
shutil.rmtree(extract_folder)
|
||||||
|
except OSError as e:
|
||||||
|
logging.warning(f"Failed to remove GZ folder {extract_folder}: {e}")
|
||||||
end = time.time()
|
end = time.time()
|
||||||
elapsed_time = end - start
|
elapsed_time = end - start
|
||||||
|
|
||||||
logging.info(f"All Complete total time {elapsed_time:.2f} seconds")
|
if elapsed_time < 60:
|
||||||
|
elapsed_time_str = f"{int(elapsed_time)}s"
|
||||||
|
elif elapsed_time < 3600:
|
||||||
|
elapsed_time_str = f"{int(elapsed_time // 60)}m {int(elapsed_time % 60)}s"
|
||||||
|
else:
|
||||||
|
elapsed_time_str = f"{int(elapsed_time // 3600)}h {int((elapsed_time % 3600) // 60)}m"
|
||||||
|
|
||||||
|
logging.info(f"All Complete total time {elapsed_time_str}")
|
||||||
|
|||||||
+2
-5
@@ -1,9 +1,6 @@
|
|||||||
from .nimrod import Nimrod
|
from .nimrod import Nimrod
|
||||||
from .batch_nimrod import BatchNimrod
|
from .batch_nimrod import BatchNimrod
|
||||||
from .generate_timeseries import GenerateTimeseries
|
from .generate_timeseries import GenerateTimeseries
|
||||||
|
from .extract import Extract
|
||||||
|
|
||||||
__all__ = [
|
__all__ = ["Nimrod", "BatchNimrod", "GenerateTimeseries", "Extract"]
|
||||||
"Nimrod",
|
|
||||||
"BatchNimrod",
|
|
||||||
"GenerateTimeseries",
|
|
||||||
]
|
|
||||||
|
|||||||
Executable
+72
@@ -0,0 +1,72 @@
|
|||||||
|
import tarfile
|
||||||
|
import gzip
|
||||||
|
import shutil
|
||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
import concurrent.futures
|
||||||
|
|
||||||
|
|
||||||
|
class Extract:
|
||||||
|
# Directory containing .tar files
|
||||||
|
def __init__(self, Config):
|
||||||
|
self.config = Config
|
||||||
|
|
||||||
|
def extract_tar_batch(self, tar_files):
|
||||||
|
for tar_file in tar_files:
|
||||||
|
tar_path = Path(self.config.TAR_TOP_FOLDER, tar_file)
|
||||||
|
|
||||||
|
# Create a folder for extracted tar contents
|
||||||
|
extract_folder = Path(
|
||||||
|
self.config.GZ_TOP_FOLDER, tar_file.replace(".tar", "")
|
||||||
|
)
|
||||||
|
Path(extract_folder).mkdir(exist_ok=True)
|
||||||
|
|
||||||
|
# Extract .tar file
|
||||||
|
with tarfile.open(tar_path, "r") as tar:
|
||||||
|
tar.extractall(path=extract_folder)
|
||||||
|
|
||||||
|
if self.config.delete_tar_after_processing:
|
||||||
|
os.remove(tar_path)
|
||||||
|
|
||||||
|
def process_single_gz(self, gz_path, dat_path):
|
||||||
|
try:
|
||||||
|
with gzip.open(gz_path, "rb") as f_in:
|
||||||
|
with open(dat_path, "wb") as f_out:
|
||||||
|
shutil.copyfileobj(f_in, f_out)
|
||||||
|
|
||||||
|
if self.config.delete_gz_after_processing:
|
||||||
|
os.remove(gz_path)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error extracting {gz_path}: {e}")
|
||||||
|
|
||||||
|
def extract_gz_batch(self):
|
||||||
|
gz_tasks = []
|
||||||
|
for root, _, files in os.walk(self.config.GZ_TOP_FOLDER):
|
||||||
|
for file in files:
|
||||||
|
# only handle .gz files
|
||||||
|
if not file.endswith(".dat.gz"):
|
||||||
|
continue
|
||||||
|
|
||||||
|
gz_path = Path(root, file)
|
||||||
|
dat_path = Path(self.config.DAT_TOP_FOLDER, file.replace(".gz", ""))
|
||||||
|
gz_tasks.append((gz_path, dat_path))
|
||||||
|
|
||||||
|
print(f"Extracting {len(gz_tasks)} gz files concurrently...")
|
||||||
|
|
||||||
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||||
|
futures = [
|
||||||
|
executor.submit(self.process_single_gz, gz_path, dat_path)
|
||||||
|
for gz_path, dat_path in gz_tasks
|
||||||
|
]
|
||||||
|
concurrent.futures.wait(futures)
|
||||||
|
|
||||||
|
try:
|
||||||
|
shutil.rmtree(self.config.GZ_TOP_FOLDER)
|
||||||
|
# Recreate the folder for the next batch
|
||||||
|
Path(self.config.GZ_TOP_FOLDER).mkdir(exist_ok=True)
|
||||||
|
print("processing complete and GZ files deleted")
|
||||||
|
except Exception as e:
|
||||||
|
print(str(e))
|
||||||
|
print(
|
||||||
|
f"processing complete but GZ folder delete failed. Please delete manually ({self.config.GZ_TOP_FOLDER})"
|
||||||
|
)
|
||||||
@@ -164,8 +164,8 @@ class GenerateTimeseries:
|
|||||||
executor.shutdown(wait=False, cancel_futures=True)
|
executor.shutdown(wait=False, cancel_futures=True)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def write_results_to_csv(self, results, locations):
|
def append_results_to_csv(self, results, locations):
|
||||||
"""Write extracted data to CSV files for each zone.
|
"""Append extracted data to CSV files for each zone.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
results (dict): Aggregated results {zone_id: {'dates': [], 'values': []}}
|
results (dict): Aggregated results {zone_id: {'dates': [], 'values': []}}
|
||||||
@@ -217,19 +217,41 @@ class GenerateTimeseries:
|
|||||||
|
|
||||||
df_dict[grid_square] = aligned_values
|
df_dict[grid_square] = aligned_values
|
||||||
|
|
||||||
df = pd.DataFrame(df_dict)
|
new_df = pd.DataFrame(df_dict)
|
||||||
|
|
||||||
# Sort by datetime (already sorted)
|
# Format datetime column in new_df
|
||||||
sorted_df = df.sort("datetime")
|
new_df = new_df.with_columns(
|
||||||
|
|
||||||
# Format datetime column
|
|
||||||
sorted_df = sorted_df.with_columns(
|
|
||||||
pd.col("datetime").dt.strftime("%Y-%m-%d %H:%M:%S")
|
pd.col("datetime").dt.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
)
|
)
|
||||||
|
|
||||||
output_path = (
|
output_path = (
|
||||||
Path(self.config.COMBINED_FOLDER) / f"{zone_name}_timeseries_data.csv"
|
Path(self.config.COMBINED_FOLDER) / f"{zone_name}_timeseries_data.csv"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if output_path.exists():
|
||||||
|
# Load existing CSV
|
||||||
|
existing_df = pd.read_csv(output_path)
|
||||||
|
|
||||||
|
# Reorder new_df to match existing_df
|
||||||
|
new_df = new_df.select(existing_df.columns)
|
||||||
|
|
||||||
|
# Concatenate
|
||||||
|
combined_df = pd.concat([existing_df, new_df])
|
||||||
|
# Sort by datetime
|
||||||
|
combined_df = combined_df.sort("datetime")
|
||||||
|
# Write back
|
||||||
|
combined_df.write_csv(output_path, float_precision=4)
|
||||||
|
else:
|
||||||
|
# Write new CSV
|
||||||
|
# Sort columns to ensure deterministic order (datetime first)
|
||||||
|
cols = new_df.columns
|
||||||
|
cols.remove("datetime")
|
||||||
|
cols.sort()
|
||||||
|
sorted_cols = ["datetime"] + cols
|
||||||
|
new_df = new_df.select(sorted_cols)
|
||||||
|
|
||||||
|
# Sort by datetime (already sorted but good practice)
|
||||||
|
sorted_df = new_df.sort("datetime")
|
||||||
sorted_df.write_csv(output_path, float_precision=4)
|
sorted_df.write_csv(output_path, float_precision=4)
|
||||||
|
|
||||||
logging.info("All CSV files written.")
|
logging.info("All CSV files updated.")
|
||||||
|
|||||||
+2
-2
@@ -1,7 +1,7 @@
|
|||||||
[project]
|
[project]
|
||||||
name = "met-office"
|
name = "met-office"
|
||||||
version = "1.1.0"
|
version = "1.3.2"
|
||||||
description = "Convert .dat nimrod files to .asc files"
|
description = "Convert nimrod files to .csv timeseries"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
requires-python = ">=3.14"
|
requires-python = ">=3.14"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ requires-python = ">=3.14"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "met-office"
|
name = "met-office"
|
||||||
version = "1.1.0"
|
version = "1.3.2"
|
||||||
source = { virtual = "." }
|
source = { virtual = "." }
|
||||||
dependencies = [
|
dependencies = [
|
||||||
{ name = "numpy" },
|
{ name = "numpy" },
|
||||||
|
|||||||
Reference in New Issue
Block a user