Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
85deee7843
|
|||
|
e4f8c2d502
|
|||
|
59f459d4d0
|
|||
|
84ba6c837c
|
|||
|
c415b81bc8
|
|||
|
bd0a421bb9
|
|||
|
4bd32641bd
|
|||
|
83405eb17e
|
|||
|
009c40e08a
|
@@ -9,6 +9,7 @@ wheels/
|
|||||||
# Virtual environments
|
# Virtual environments
|
||||||
.venv
|
.venv
|
||||||
|
|
||||||
|
dat_other/*
|
||||||
dat_files/*
|
dat_files/*
|
||||||
asc_files/*
|
asc_files/*
|
||||||
csv_files/*
|
csv_files/*
|
||||||
@@ -16,3 +17,5 @@ combined_files/*
|
|||||||
zone_inputs/*
|
zone_inputs/*
|
||||||
|
|
||||||
*.tar.gz
|
*.tar.gz
|
||||||
|
|
||||||
|
generate_test_data.py
|
||||||
@@ -5,31 +5,31 @@ This project provides tools for processing UK Met Office Rain Radar NIMROD image
|
|||||||
## Overview
|
## Overview
|
||||||
|
|
||||||
The project consists of a main pipeline workflow that processes multiple modules in sequence:
|
The project consists of a main pipeline workflow that processes multiple modules in sequence:
|
||||||
|
|
||||||
- `main.py`: Main pipeline orchestrator that calls on the modules as needed
|
- `main.py`: Main pipeline orchestrator that calls on the modules as needed
|
||||||
- `batch_nimrod.py`: Module for batch processing multiple NIMROD files with configurable bounding boxes
|
- `batch_nimrod.py`: Module for batch processing multiple NIMROD files with configurable bounding boxes
|
||||||
- `generate_timeseries.py`: Module for extracting cropped rain data and creating rainfall timeseries
|
- `generate_timeseries.py`: Module for extracting cropped rain data and creating rainfall timeseries
|
||||||
- `combine_timeseries.py`: Module for combining grouped timeseries CSVs into consolidated datasets
|
|
||||||
|
|
||||||
## Features
|
## Features
|
||||||
|
|
||||||
### main.py
|
### main.py
|
||||||
|
|
||||||
- Orchestrates the entire workflow pipeline
|
- Orchestrates the entire workflow pipeline
|
||||||
- Processes DAT files to ASC format
|
- Processes DAT files to ASC format
|
||||||
- Generates timeseries data for specified locations
|
- Generates timeseries data for specified locations
|
||||||
- Combines grouped CSV files into consolidated datasets
|
- Combines grouped CSV files into consolidated datasets
|
||||||
|
|
||||||
### batch_nimrod.py
|
### batch_nimrod.py
|
||||||
|
|
||||||
- Process multiple NIMROD dat files
|
- Process multiple NIMROD dat files
|
||||||
- Automatically extract datetime from file data
|
- Automatically extract datetime from file data
|
||||||
- Export clipped raster data to ASC format
|
- Export clipped raster data to ASC format
|
||||||
|
|
||||||
### generate_timeseries.py
|
### generate_timeseries.py
|
||||||
|
|
||||||
- Extract cropped rain data based on specified locations
|
- Extract cropped rain data based on specified locations
|
||||||
- Create rainfall timeseries CSVs for each location
|
- Create rainfall timeseries CSVs for each location
|
||||||
- Parse datetime from filename and create proper datetime index
|
- Parse datetime from filename and create proper datetime index
|
||||||
|
|
||||||
### combine_timeseries.py
|
|
||||||
- Combine multiple timeseries CSV files into grouped datasets
|
|
||||||
- Group locations by specified output groups
|
- Group locations by specified output groups
|
||||||
- Create consolidated CSV files for each group
|
- Create consolidated CSV files for each group
|
||||||
|
|
||||||
@@ -46,32 +46,34 @@ It is recommended to use UV for environment and package handling.
|
|||||||
1. Adjust the config.py file to match your needs.
|
1. Adjust the config.py file to match your needs.
|
||||||
1. Ensure your .dat files are in the DAT_TOP_FOLDER (as per config location)
|
1. Ensure your .dat files are in the DAT_TOP_FOLDER (as per config location)
|
||||||
1. Ensure your zone csv files are in the ZONE_FOLDER (as per config location)
|
1. Ensure your zone csv files are in the ZONE_FOLDER (as per config location)
|
||||||
1. RunMain Pipeline `uv run main.py
|
1. RunMain Pipeline `uv run main.py` Note that you will have to set your environment variable `PYTHON_GIL=0` first
|
||||||
1. find the output in the COMBINED_FOLDER (as per config location)
|
1. find the output in the COMBINED_FOLDER (as per config location)
|
||||||
|
|
||||||
|
|
||||||
The main pipeline will:
|
The main pipeline will:
|
||||||
|
|
||||||
1. Process DAT files to ASC format if needed
|
1. Process DAT files to ASC format if needed
|
||||||
2. Generate timeseries data for specified locations
|
1. Generate timeseries data for specified locations
|
||||||
3. Combine grouped CSV files into consolidated datasets
|
1. Combine grouped CSV files into consolidated datasets
|
||||||
|
|
||||||
## Configuration
|
## Configuration
|
||||||
|
|
||||||
The `config.py` file defines folder paths:
|
The `config.py` file defines folder paths:
|
||||||
|
|
||||||
- DAT_TOP_FOLDER: "./dat_files"
|
- DAT_TOP_FOLDER: "./dat_files"
|
||||||
- ASC_TOP_FOLDER: "./asc_files"
|
- ASC_TOP_FOLDER: "./asc_files"
|
||||||
- CSV_TOP_FOLDER: "./csv_files"
|
|
||||||
- COMBINED_FOLDER: "./combined_files"
|
- COMBINED_FOLDER: "./combined_files"
|
||||||
|
|
||||||
Example of how the zone csv files should look:
|
Example of how the zone csv files should look:
|
||||||
```
|
|
||||||
filler, zone_name, easting, northing, other_filler, last_filler, zone_number
|
```csv
|
||||||
aa, TM0816, 608500, 216500, a, a, 1
|
1K Grid, easting, northing, zone_number
|
||||||
aa, TF6842, 568500, 342500, a, a, 1
|
TM0816, 608500, 216500, 1
|
||||||
|
TF6842, 568500, 342500, 1
|
||||||
```
|
```
|
||||||
|
|
||||||
## Acknowledgments
|
## Acknowledgments
|
||||||
|
|
||||||
Thank you to the following projects for their inspiration and code:
|
Thank you to the following projects for their inspiration and code:
|
||||||
* [Richard Thomas - Original Nimrod dat to asc file conversion](https://github.com/richard-thomas/MetOffice_NIMROD)
|
|
||||||
* [Declan Valters - building the timeseries from the asc files](https://github.com/dvalters/NIMROD-toolbox)
|
- [Richard Thomas - Original Nimrod dat to asc file conversion](https://github.com/richard-thomas/MetOffice_NIMROD)
|
||||||
|
- [Declan Valters - building the timeseries from the asc files](https://github.com/dvalters/NIMROD-toolbox)
|
||||||
|
|||||||
@@ -1,10 +1,8 @@
|
|||||||
class Config:
|
class Config:
|
||||||
DAT_TOP_FOLDER = "./dat_files"
|
DAT_TOP_FOLDER = "./dat_files"
|
||||||
ASC_TOP_FOLDER = "./asc_files"
|
ASC_TOP_FOLDER = "./asc_files"
|
||||||
CSV_TOP_FOLDER = "./csv_files"
|
|
||||||
COMBINED_FOLDER = "./combined_files"
|
COMBINED_FOLDER = "./combined_files"
|
||||||
ZONE_FOLDER = "./zone_inputs"
|
ZONE_FOLDER = "./zone_inputs"
|
||||||
|
|
||||||
delete_dat_after_processing = False
|
delete_dat_after_processing = False
|
||||||
delete_asc_after_processing = True
|
delete_asc_after_processing = True
|
||||||
delete_csv_after_combining = True
|
|
||||||
@@ -2,58 +2,101 @@ import logging
|
|||||||
import time
|
import time
|
||||||
import os
|
import os
|
||||||
import csv
|
import csv
|
||||||
|
import concurrent.futures
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from config import Config
|
from config import Config
|
||||||
from modules import BatchNimrod, GenerateTimeseries, CombineTimeseries
|
from modules import BatchNimrod, GenerateTimeseries
|
||||||
|
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def process_pipeline(dat_file):
|
||||||
|
# 1. Process DAT to ASC
|
||||||
|
asc_file = batch._process_single_file(dat_file)
|
||||||
|
if not asc_file:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# 2. Extract data from ASC
|
||||||
|
file_results = timeseries.process_asc_file(asc_file, locations)
|
||||||
|
return file_results
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
os.makedirs(Path(Config.ASC_TOP_FOLDER), exist_ok=True)
|
os.makedirs(Path(Config.ASC_TOP_FOLDER), exist_ok=True)
|
||||||
os.makedirs(Path(Config.CSV_TOP_FOLDER), exist_ok=True)
|
|
||||||
os.makedirs(Path(Config.COMBINED_FOLDER), exist_ok=True)
|
os.makedirs(Path(Config.COMBINED_FOLDER), exist_ok=True)
|
||||||
|
|
||||||
locations = []
|
locations = []
|
||||||
|
zones = set()
|
||||||
# load zone inputs here
|
# load zone inputs here
|
||||||
for file in os.listdir(Path(Config.ZONE_FOLDER)):
|
for file in os.listdir(Path(Config.ZONE_FOLDER)):
|
||||||
with open(Path(Config.ZONE_FOLDER,file), 'r') as csvfile:
|
with open(Path(Config.ZONE_FOLDER, file), "r") as csvfile:
|
||||||
reader = csv.reader(csvfile)
|
reader = csv.reader(csvfile)
|
||||||
header = next(reader) # Skip header row
|
header = next(reader) # Skip header row
|
||||||
for row in reader:
|
for row in reader:
|
||||||
# Extract the relevant fields: Ossheet (location ID), Easting, Northing, Zone
|
# Extract the relevant fields: 1K Grid, Easting, Northing, Zone
|
||||||
zone_id = row[1] # Ossheet column
|
grid_name = row[0] # 1k Grid name
|
||||||
easting = int(row[2]) # Easting column
|
easting = int(row[1]) # Easting column
|
||||||
northing = int(row[3]) # Northing column
|
northing = int(row[2]) # Northing column
|
||||||
zone = int(row[6]) # ZoneID column
|
zone = int(row[3]) # ZoneID column
|
||||||
locations.append([zone_id, easting, northing, zone])
|
locations.append([grid_name, easting, northing, zone])
|
||||||
|
zones.add(zone)
|
||||||
|
logging.info(f"Count of 1km Grids: {len(locations)}")
|
||||||
|
logging.info(f"Count of Zones: {len(zones)}")
|
||||||
|
|
||||||
batch = BatchNimrod(Config)
|
batch = BatchNimrod(Config)
|
||||||
timeseries = GenerateTimeseries(Config)
|
timeseries = GenerateTimeseries(Config, locations)
|
||||||
combiner = CombineTimeseries(Config, locations)
|
|
||||||
|
|
||||||
start = time.time()
|
start = time.time()
|
||||||
logging.info("Starting to process DAT to ASC")
|
logging.info(
|
||||||
|
"Starting interleaved processing of DAT files and Timeseries generation"
|
||||||
|
)
|
||||||
|
|
||||||
batch.process_nimrod_files()
|
# Initialize results structure
|
||||||
batch_checkpoint = time.time()
|
results = {loc[0]: {"dates": [], "values": []} for loc in locations}
|
||||||
elapsed_time = batch_checkpoint - start
|
|
||||||
logging.info(f"DAT to ASC completed in {elapsed_time:.2f} seconds")
|
|
||||||
|
|
||||||
logging.info("Starting generating timeseries data for all locations.")
|
# Get list of DAT files
|
||||||
place_start = time.time()
|
dat_files = [
|
||||||
timeseries.extract_data_for_all_locations(locations)
|
f for f in os.listdir(Path(Config.DAT_TOP_FOLDER)) if not f.startswith(".")
|
||||||
place_end = time.time()
|
]
|
||||||
place_create_time = place_end - place_start
|
total_files = len(dat_files)
|
||||||
elapsed_time = place_end - start
|
|
||||||
logging.info(f"Timeseries generation completed in {place_create_time:.2f} seconds")
|
|
||||||
logging.info(f"Total time so far {elapsed_time:.2f} seconds")
|
|
||||||
|
|
||||||
logging.info("combining CSVs into groups")
|
logging.info(f"Processing {total_files} files concurrently...")
|
||||||
combiner.combine_csv_files()
|
|
||||||
logging.info("CSVs combined!")
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||||
|
future_to_file = {
|
||||||
|
executor.submit(process_pipeline, dat_file): dat_file
|
||||||
|
for dat_file in dat_files
|
||||||
|
}
|
||||||
|
|
||||||
|
completed_count = 0
|
||||||
|
try:
|
||||||
|
for future in concurrent.futures.as_completed(future_to_file):
|
||||||
|
file_results = future.result()
|
||||||
|
if file_results:
|
||||||
|
for res in file_results:
|
||||||
|
zone_id = res["zone_id"]
|
||||||
|
results[zone_id]["dates"].append(res["date"])
|
||||||
|
results[zone_id]["values"].append(res["value"])
|
||||||
|
|
||||||
|
completed_count += 1
|
||||||
|
if completed_count % 100 == 0:
|
||||||
|
elapsed_time = time.time() - start
|
||||||
|
files_per_minute = (completed_count / elapsed_time) * 60
|
||||||
|
remaining_files = total_files - completed_count
|
||||||
|
eta_minutes = remaining_files / (files_per_minute / 60) / 60
|
||||||
|
logging.info(f"""Processed {completed_count} out of {total_files} files.
|
||||||
|
Speed: {files_per_minute:.2f} files/min. ETA: {eta_minutes:.2f} minutes""")
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logging.warning("KeyboardInterrupt received. Cancelling pending tasks...")
|
||||||
|
executor.shutdown(wait=False, cancel_futures=True)
|
||||||
|
raise
|
||||||
|
|
||||||
|
elapsed_time = time.time() - start
|
||||||
|
logging.info(f"Interleaved processing completed in {elapsed_time:.2f} seconds")
|
||||||
|
|
||||||
|
logging.info("Writing CSV files...")
|
||||||
|
timeseries.write_results_to_csv(results, locations)
|
||||||
end = time.time()
|
end = time.time()
|
||||||
elapsed_time = end - start
|
elapsed_time = end - start
|
||||||
|
|
||||||
|
|||||||
@@ -1,11 +1,9 @@
|
|||||||
from .nimrod import Nimrod
|
from .nimrod import Nimrod
|
||||||
from .batch_nimrod import BatchNimrod
|
from .batch_nimrod import BatchNimrod
|
||||||
from .generate_timeseries import GenerateTimeseries
|
from .generate_timeseries import GenerateTimeseries
|
||||||
from .combine_timeseries import CombineTimeseries
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"Nimrod",
|
"Nimrod",
|
||||||
"BatchNimrod",
|
"BatchNimrod",
|
||||||
"GenerateTimeseries",
|
"GenerateTimeseries",
|
||||||
"CombineTimeseries"
|
|
||||||
]
|
]
|
||||||
+16
-5
@@ -5,7 +5,6 @@ import logging
|
|||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class BatchNimrod:
|
class BatchNimrod:
|
||||||
def __init__(self, config) -> None:
|
def __init__(self, config) -> None:
|
||||||
self.config = config
|
self.config = config
|
||||||
@@ -36,7 +35,7 @@ class BatchNimrod:
|
|||||||
os.remove(in_file_full)
|
os.remove(in_file_full)
|
||||||
|
|
||||||
logging.debug(f"Successfully processed: {in_file_full}")
|
logging.debug(f"Successfully processed: {in_file_full}")
|
||||||
return True
|
return out_file_name
|
||||||
|
|
||||||
except Nimrod.HeaderReadError as e:
|
except Nimrod.HeaderReadError as e:
|
||||||
logging.error(f"Failed to read file {in_file_full}, is it corrupt?")
|
logging.error(f"Failed to read file {in_file_full}, is it corrupt?")
|
||||||
@@ -59,7 +58,11 @@ class BatchNimrod:
|
|||||||
box for each area, and exports clipped raster data to OUT_TOP_FOLDER.
|
box for each area, and exports clipped raster data to OUT_TOP_FOLDER.
|
||||||
"""
|
"""
|
||||||
# Read all file names in the folder
|
# Read all file names in the folder
|
||||||
files_to_process = [f for f in os.listdir(Path(self.config.DAT_TOP_FOLDER)) if not f.startswith('.')]
|
files_to_process = [
|
||||||
|
f
|
||||||
|
for f in os.listdir(Path(self.config.DAT_TOP_FOLDER))
|
||||||
|
if not f.startswith(".")
|
||||||
|
]
|
||||||
total_files = len(files_to_process)
|
total_files = len(files_to_process)
|
||||||
|
|
||||||
logging.info(f"Processing {total_files} files concurrently...")
|
logging.info(f"Processing {total_files} files concurrently...")
|
||||||
@@ -72,8 +75,16 @@ class BatchNimrod:
|
|||||||
}
|
}
|
||||||
|
|
||||||
completed_count = 0
|
completed_count = 0
|
||||||
|
try:
|
||||||
for future in concurrent.futures.as_completed(future_to_file):
|
for future in concurrent.futures.as_completed(future_to_file):
|
||||||
completed_count += 1
|
completed_count += 1
|
||||||
if completed_count % 10 == 0:
|
if completed_count % 10 == 0:
|
||||||
logging.info(f'processed {completed_count} out of {total_files} files')
|
logging.info(
|
||||||
|
f"processed {completed_count} out of {total_files} files"
|
||||||
|
)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logging.warning(
|
||||||
|
"KeyboardInterrupt received. Cancelling pending tasks..."
|
||||||
|
)
|
||||||
|
executor.shutdown(wait=False, cancel_futures=True)
|
||||||
|
raise
|
||||||
|
|||||||
@@ -1,37 +0,0 @@
|
|||||||
import polars as pd
|
|
||||||
import os
|
|
||||||
|
|
||||||
|
|
||||||
class CombineTimeseries:
|
|
||||||
def __init__(self, config, locations):
|
|
||||||
self.config = config
|
|
||||||
self.locations = locations
|
|
||||||
self.grouped_locations = {}
|
|
||||||
self.build_location_groups()
|
|
||||||
|
|
||||||
def build_location_groups(self):
|
|
||||||
for location in self.locations:
|
|
||||||
group = location[3] # zone number
|
|
||||||
if group not in self.grouped_locations:
|
|
||||||
self.grouped_locations[group] = []
|
|
||||||
self.grouped_locations[group].append(location)
|
|
||||||
|
|
||||||
def combine_csv_files(self):
|
|
||||||
for group, loc_list in self.grouped_locations.items():
|
|
||||||
combined_df = None
|
|
||||||
for loc in loc_list:
|
|
||||||
csv_to_load = f"./csv_files/{loc[0]}_timeseries_data.csv"
|
|
||||||
df = pd.read_csv(csv_to_load)
|
|
||||||
if combined_df is None:
|
|
||||||
combined_df = df
|
|
||||||
else:
|
|
||||||
combined_df = combined_df.join(df, on='datetime')
|
|
||||||
|
|
||||||
if self.config.delete_csv_after_combining:
|
|
||||||
os.remove(csv_to_load)
|
|
||||||
|
|
||||||
output_file = (
|
|
||||||
f"{self.config.COMBINED_FOLDER}/zone_{group}_timeseries_data.csv"
|
|
||||||
)
|
|
||||||
sorted_df = combined_df.sort('datetime')
|
|
||||||
sorted_df.write_csv(output_file)
|
|
||||||
@@ -5,12 +5,13 @@ import polars as pd
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import os
|
import os
|
||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
|
import logging
|
||||||
|
|
||||||
|
|
||||||
class GenerateTimeseries:
|
class GenerateTimeseries:
|
||||||
def __init__(self, config):
|
def __init__(self, config, locations):
|
||||||
self.config = config
|
self.config = config
|
||||||
|
self.locations = locations
|
||||||
|
|
||||||
def _read_ascii_header(self, ascii_raster_file: str) -> list:
|
def _read_ascii_header(self, ascii_raster_file: str) -> list:
|
||||||
"""Reads header information from an ASCII DEM
|
"""Reads header information from an ASCII DEM
|
||||||
@@ -63,7 +64,7 @@ class GenerateTimeseries:
|
|||||||
|
|
||||||
return int(start_col), int(start_row), int(end_col), int(end_row)
|
return int(start_col), int(start_row), int(end_col), int(end_row)
|
||||||
|
|
||||||
def _process_single_file(self, file_name, locations):
|
def process_asc_file(self, file_name, locations):
|
||||||
"""Process a single ASC file and extract data for all locations.
|
"""Process a single ASC file and extract data for all locations.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -75,7 +76,7 @@ class GenerateTimeseries:
|
|||||||
or None if processing fails.
|
or None if processing fails.
|
||||||
Format: [{'zone_id': id, 'date': datetime, 'value': float}, ...]
|
Format: [{'zone_id': id, 'date': datetime, 'value': float}, ...]
|
||||||
"""
|
"""
|
||||||
if not file_name.endswith('.asc'):
|
if not file_name.endswith(".asc"):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
file_path = Path(self.config.ASC_TOP_FOLDER, file_name)
|
file_path = Path(self.config.ASC_TOP_FOLDER, file_name)
|
||||||
@@ -111,18 +112,13 @@ class GenerateTimeseries:
|
|||||||
# print(f"Warning: Crop too small for {zone_id} in {file_name}")
|
# print(f"Warning: Crop too small for {zone_id} in {file_name}")
|
||||||
val = 0.0
|
val = 0.0
|
||||||
|
|
||||||
results.append({
|
results.append({"zone_id": zone_id, "date": parsed_date, "value": val})
|
||||||
'zone_id': zone_id,
|
|
||||||
'date': parsed_date,
|
|
||||||
'value': val
|
|
||||||
})
|
|
||||||
|
|
||||||
if self.config.delete_asc_after_processing:
|
if self.config.delete_asc_after_processing:
|
||||||
os.remove(file_path)
|
os.remove(file_path)
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error processing file {file_name}: {e}")
|
print(f"Error processing file {file_name}: {e}")
|
||||||
return None
|
return None
|
||||||
@@ -134,7 +130,7 @@ class GenerateTimeseries:
|
|||||||
locations (list): List of location data [zone_id, easting, northing, zone]
|
locations (list): List of location data [zone_id, easting, northing, zone]
|
||||||
"""
|
"""
|
||||||
# Initialize data structure to hold results: {zone_id: {'dates': [], 'values': []}}
|
# Initialize data structure to hold results: {zone_id: {'dates': [], 'values': []}}
|
||||||
results = {loc[0]: {'dates': [], 'values': []} for loc in locations}
|
results = {loc[0]: {"dates": [], "values": []} for loc in locations}
|
||||||
|
|
||||||
# Get list of ASC files
|
# Get list of ASC files
|
||||||
asc_files = sorted(os.listdir(Path(self.config.ASC_TOP_FOLDER)))
|
asc_files = sorted(os.listdir(Path(self.config.ASC_TOP_FOLDER)))
|
||||||
@@ -143,40 +139,87 @@ class GenerateTimeseries:
|
|||||||
|
|
||||||
# Use ThreadPoolExecutor for concurrent processing
|
# Use ThreadPoolExecutor for concurrent processing
|
||||||
# Since we are using Python 3.14t (free-threaded), this should scale well even for CPU work
|
# Since we are using Python 3.14t (free-threaded), this should scale well even for CPU work
|
||||||
# mixed with I/O.
|
|
||||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||||
# Submit all tasks
|
# Submit all tasks
|
||||||
future_to_file = {
|
future_to_file = {
|
||||||
executor.submit(self._process_single_file, file_name, locations): file_name
|
executor.submit(self.process_asc_file, file_name, locations): file_name
|
||||||
for file_name in asc_files
|
for file_name in asc_files
|
||||||
}
|
}
|
||||||
|
|
||||||
completed_count = 0
|
completed_count = 0
|
||||||
|
try:
|
||||||
for future in concurrent.futures.as_completed(future_to_file):
|
for future in concurrent.futures.as_completed(future_to_file):
|
||||||
file_results = future.result()
|
file_results = future.result()
|
||||||
if file_results:
|
if file_results:
|
||||||
for res in file_results:
|
for res in file_results:
|
||||||
zone_id = res['zone_id']
|
zone_id = res["zone_id"]
|
||||||
results[zone_id]['dates'].append(res['date'])
|
results[zone_id]["dates"].append(res["date"])
|
||||||
results[zone_id]['values'].append(res['value'])
|
results[zone_id]["values"].append(res["value"])
|
||||||
|
|
||||||
completed_count += 1
|
completed_count += 1
|
||||||
if completed_count % 100 == 0:
|
if completed_count % 100 == 0:
|
||||||
print(f"Processed {completed_count}/{total_files} files")
|
print(f"Processed {completed_count}/{total_files} files")
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("KeyboardInterrupt received. Cancelling pending tasks...")
|
||||||
|
executor.shutdown(wait=False, cancel_futures=True)
|
||||||
|
raise
|
||||||
|
|
||||||
# Write CSVs for each location
|
def write_results_to_csv(self, results, locations):
|
||||||
print("Writing CSV files...")
|
"""Write extracted data to CSV files for each zone.
|
||||||
for location in locations:
|
|
||||||
zone_id = location[0]
|
|
||||||
data = results[zone_id]
|
|
||||||
|
|
||||||
if not data['dates']:
|
Args:
|
||||||
print(f"No data found for {zone_id}")
|
results (dict): Aggregated results {zone_id: {'dates': [], 'values': []}}
|
||||||
continue
|
locations (list): List of location data [zone_id, easting, northing, zone]
|
||||||
|
"""
|
||||||
|
# Group results by zone and collect all unique dates
|
||||||
|
zone_data = {}
|
||||||
|
for loc in locations:
|
||||||
|
zone_id = loc[0]
|
||||||
|
zone_name = loc[3]
|
||||||
|
|
||||||
df = pd.DataFrame({"datetime": data['dates'], zone_id: data['values']})
|
if zone_name not in zone_data:
|
||||||
|
zone_data[zone_name] = {"dates": [], "values": {}}
|
||||||
|
|
||||||
# Sort the dataframe into date order
|
zone_data[zone_name]["values"][zone_id] = results[zone_id]["values"]
|
||||||
|
zone_data[zone_name]["dates"].extend(results[zone_id]["dates"])
|
||||||
|
|
||||||
|
# Get unique sorted dates across all zones
|
||||||
|
for zone_name, data in zone_data.items():
|
||||||
|
data["dates"] = sorted(set(data["dates"]))
|
||||||
|
|
||||||
|
# Now write one CSV per zone with aligned timestamps
|
||||||
|
for zone_name, data in zone_data.items():
|
||||||
|
dates = data["dates"]
|
||||||
|
values_dict = data["values"]
|
||||||
|
|
||||||
|
# Create aligned DataFrame
|
||||||
|
df_dict = {"datetime": dates}
|
||||||
|
for grid_square, values in values_dict.items():
|
||||||
|
# Align values to the common dates
|
||||||
|
aligned_values = []
|
||||||
|
value_iter = iter(values)
|
||||||
|
date_iter = iter(dates)
|
||||||
|
|
||||||
|
current_date = next(date_iter, None)
|
||||||
|
current_value = next(value_iter, None)
|
||||||
|
|
||||||
|
for expected_date in dates:
|
||||||
|
if current_date == expected_date:
|
||||||
|
aligned_values.append(current_value)
|
||||||
|
try:
|
||||||
|
current_date = next(date_iter)
|
||||||
|
current_value = next(value_iter)
|
||||||
|
except StopIteration:
|
||||||
|
current_date = None
|
||||||
|
current_value = None
|
||||||
|
else:
|
||||||
|
aligned_values.append(None) # Missing value
|
||||||
|
|
||||||
|
df_dict[grid_square] = aligned_values
|
||||||
|
|
||||||
|
df = pd.DataFrame(df_dict)
|
||||||
|
|
||||||
|
# Sort by datetime (already sorted)
|
||||||
sorted_df = df.sort("datetime")
|
sorted_df = df.sort("datetime")
|
||||||
|
|
||||||
# Format datetime column
|
# Format datetime column
|
||||||
@@ -184,9 +227,9 @@ class GenerateTimeseries:
|
|||||||
pd.col("datetime").dt.strftime("%Y-%m-%d %H:%M:%S")
|
pd.col("datetime").dt.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
)
|
)
|
||||||
|
|
||||||
output_path = Path(self.config.CSV_TOP_FOLDER) / f"{zone_id}_timeseries_data.csv"
|
output_path = (
|
||||||
sorted_df.write_csv(
|
Path(self.config.COMBINED_FOLDER) / f"{zone_name}_timeseries_data.csv"
|
||||||
output_path,
|
|
||||||
float_precision=4
|
|
||||||
)
|
)
|
||||||
print("All CSV files written.")
|
sorted_df.write_csv(output_path, float_precision=4)
|
||||||
|
|
||||||
|
logging.info("All CSV files written.")
|
||||||
|
|||||||
+5
-3
@@ -258,7 +258,7 @@ class Nimrod:
|
|||||||
# Read data as big-endian 16-bit integers
|
# Read data as big-endian 16-bit integers
|
||||||
# numpy.frombuffer is efficient for reading from bytes
|
# numpy.frombuffer is efficient for reading from bytes
|
||||||
data_bytes = infile.read(array_size * 2)
|
data_bytes = infile.read(array_size * 2)
|
||||||
self.data = np.frombuffer(data_bytes, dtype='>h').astype(np.int16)
|
self.data = np.frombuffer(data_bytes, dtype=">h").astype(np.int16)
|
||||||
|
|
||||||
# Reshape to (nrows, ncols) for easier 2D manipulation
|
# Reshape to (nrows, ncols) for easier 2D manipulation
|
||||||
# Note: NIMROD data is row-major (C-style), starting from top-left
|
# Note: NIMROD data is row-major (C-style), starting from top-left
|
||||||
@@ -392,7 +392,9 @@ class Nimrod:
|
|||||||
# Use numpy slicing to extract the sub-array
|
# Use numpy slicing to extract the sub-array
|
||||||
# Note: y indices correspond to rows, x indices to columns
|
# Note: y indices correspond to rows, x indices to columns
|
||||||
# Slicing is [start:end], so we need +1 for the end index
|
# Slicing is [start:end], so we need +1 for the end index
|
||||||
self.data = self.data[yMinPixelId : yMaxPixelId + 1, xMinPixelId : xMaxPixelId + 1]
|
self.data = self.data[
|
||||||
|
yMinPixelId : yMaxPixelId + 1, xMinPixelId : xMaxPixelId + 1
|
||||||
|
]
|
||||||
|
|
||||||
# Update object where necessary
|
# Update object where necessary
|
||||||
self.x_right = self.x_left + xMaxPixelId * self.x_pixel_size
|
self.x_right = self.x_left + xMaxPixelId * self.x_pixel_size
|
||||||
@@ -435,7 +437,7 @@ class Nimrod:
|
|||||||
|
|
||||||
# Write raster data to output file using numpy.savetxt
|
# Write raster data to output file using numpy.savetxt
|
||||||
# This is significantly faster than iterating in Python
|
# This is significantly faster than iterating in Python
|
||||||
np.savetxt(outfile, self.data, fmt='%d', delimiter=' ')
|
np.savetxt(outfile, self.data, fmt="%d", delimiter=" ")
|
||||||
outfile.close()
|
outfile.close()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
+1
-1
@@ -1,6 +1,6 @@
|
|||||||
[project]
|
[project]
|
||||||
name = "met-office"
|
name = "met-office"
|
||||||
version = "1.0.0"
|
version = "1.1.1"
|
||||||
description = "Convert .dat nimrod files to .asc files"
|
description = "Convert .dat nimrod files to .asc files"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
requires-python = ">=3.14"
|
requires-python = ">=3.14"
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ requires-python = ">=3.14"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "met-office"
|
name = "met-office"
|
||||||
version = "1.0.0"
|
version = "1.1.1"
|
||||||
source = { virtual = "." }
|
source = { virtual = "." }
|
||||||
dependencies = [
|
dependencies = [
|
||||||
{ name = "numpy" },
|
{ name = "numpy" },
|
||||||
|
|||||||
Reference in New Issue
Block a user