feat: ✨ Reduced complexity & formatted files
This commit is contained in:
@@ -4,5 +4,5 @@ class Config:
|
||||
COMBINED_FOLDER = "./combined_files"
|
||||
ZONE_FOLDER = "./zone_inputs"
|
||||
|
||||
delete_dat_after_processing = True
|
||||
delete_dat_after_processing = False
|
||||
delete_asc_after_processing = True
|
||||
@@ -6,7 +6,7 @@ import concurrent.futures
|
||||
from pathlib import Path
|
||||
|
||||
from config import Config
|
||||
from modules import BatchNimrod, GenerateTimeseries, CombineTimeseries
|
||||
from modules import BatchNimrod, GenerateTimeseries
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
||||
@@ -17,29 +17,33 @@ if __name__ == "__main__":
|
||||
os.makedirs(Path(Config.COMBINED_FOLDER), exist_ok=True)
|
||||
|
||||
locations = []
|
||||
zones = set()
|
||||
# load zone inputs here
|
||||
for file in os.listdir(Path(Config.ZONE_FOLDER)):
|
||||
with open(Path(Config.ZONE_FOLDER,file), 'r') as csvfile:
|
||||
with open(Path(Config.ZONE_FOLDER, file), "r") as csvfile:
|
||||
reader = csv.reader(csvfile)
|
||||
header = next(reader) # Skip header row
|
||||
for row in reader:
|
||||
# Extract the relevant fields: 1K Grid, Easting, Northing, Zone
|
||||
zone_id = row[0] # Ossheet column
|
||||
grid_name = row[0] # 1k Grid name
|
||||
easting = int(row[1]) # Easting column
|
||||
northing = int(row[2]) # Northing column
|
||||
zone = int(row[3]) # ZoneID column
|
||||
locations.append([zone_id, easting, northing, zone])
|
||||
logging.info(f'Count of 1K Grids: {len(locations)}')
|
||||
locations.append([grid_name, easting, northing, zone])
|
||||
zones.add(zone)
|
||||
logging.info(f"Count of 1km Grids: {len(locations)}")
|
||||
logging.info(f"Count of Zones: {len(zones)}")
|
||||
|
||||
batch = BatchNimrod(Config)
|
||||
timeseries = GenerateTimeseries(Config)
|
||||
combiner = CombineTimeseries(Config, locations)
|
||||
timeseries = GenerateTimeseries(Config, locations)
|
||||
|
||||
start = time.time()
|
||||
logging.info("Starting interleaved processing of DAT files and Timeseries generation")
|
||||
logging.info(
|
||||
"Starting interleaved processing of DAT files and Timeseries generation"
|
||||
)
|
||||
|
||||
# Initialize results structure
|
||||
results = {loc[0]: {'dates': [], 'values': []} for loc in locations}
|
||||
results = {loc[0]: {"dates": [], "values": []} for loc in locations}
|
||||
|
||||
def process_pipeline(dat_file):
|
||||
# 1. Process DAT to ASC
|
||||
@@ -52,7 +56,9 @@ if __name__ == "__main__":
|
||||
return file_results
|
||||
|
||||
# Get list of DAT files
|
||||
dat_files = [f for f in os.listdir(Path(Config.DAT_TOP_FOLDER)) if not f.startswith('.')]
|
||||
dat_files = [
|
||||
f for f in os.listdir(Path(Config.DAT_TOP_FOLDER)) if not f.startswith(".")
|
||||
]
|
||||
total_files = len(dat_files)
|
||||
|
||||
logging.info(f"Processing {total_files} files concurrently...")
|
||||
@@ -69,9 +75,9 @@ if __name__ == "__main__":
|
||||
file_results = future.result()
|
||||
if file_results:
|
||||
for res in file_results:
|
||||
zone_id = res['zone_id']
|
||||
results[zone_id]['dates'].append(res['date'])
|
||||
results[zone_id]['values'].append(res['value'])
|
||||
zone_id = res["zone_id"]
|
||||
results[zone_id]["dates"].append(res["date"])
|
||||
results[zone_id]["values"].append(res["value"])
|
||||
|
||||
completed_count += 1
|
||||
if completed_count % 100 == 0:
|
||||
@@ -79,8 +85,8 @@ if __name__ == "__main__":
|
||||
files_per_minute = (completed_count / elapsed_time) * 60
|
||||
remaining_files = total_files - completed_count
|
||||
eta_minutes = remaining_files / (files_per_minute / 60) / 60
|
||||
logging.info(f'''Processed {completed_count} out of {total_files} files.
|
||||
Speed: {files_per_minute:.2f} files/min. ETA: {eta_minutes:.2f} minutes''')
|
||||
logging.info(f"""Processed {completed_count} out of {total_files} files.
|
||||
Speed: {files_per_minute:.2f} files/min. ETA: {eta_minutes:.2f} minutes""")
|
||||
except KeyboardInterrupt:
|
||||
logging.warning("KeyboardInterrupt received. Cancelling pending tasks...")
|
||||
executor.shutdown(wait=False, cancel_futures=True)
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
from .nimrod import Nimrod
|
||||
from .batch_nimrod import BatchNimrod
|
||||
from .generate_timeseries import GenerateTimeseries
|
||||
from .combine_timeseries import CombineTimeseries
|
||||
|
||||
__all__ = [
|
||||
"Nimrod",
|
||||
"BatchNimrod",
|
||||
"GenerateTimeseries",
|
||||
"CombineTimeseries"
|
||||
]
|
||||
+11
-5
@@ -5,7 +5,6 @@ import logging
|
||||
import concurrent.futures
|
||||
|
||||
|
||||
|
||||
class BatchNimrod:
|
||||
def __init__(self, config) -> None:
|
||||
self.config = config
|
||||
@@ -59,7 +58,11 @@ class BatchNimrod:
|
||||
box for each area, and exports clipped raster data to OUT_TOP_FOLDER.
|
||||
"""
|
||||
# Read all file names in the folder
|
||||
files_to_process = [f for f in os.listdir(Path(self.config.DAT_TOP_FOLDER)) if not f.startswith('.')]
|
||||
files_to_process = [
|
||||
f
|
||||
for f in os.listdir(Path(self.config.DAT_TOP_FOLDER))
|
||||
if not f.startswith(".")
|
||||
]
|
||||
total_files = len(files_to_process)
|
||||
|
||||
logging.info(f"Processing {total_files} files concurrently...")
|
||||
@@ -76,9 +79,12 @@ class BatchNimrod:
|
||||
for future in concurrent.futures.as_completed(future_to_file):
|
||||
completed_count += 1
|
||||
if completed_count % 10 == 0:
|
||||
logging.info(f'processed {completed_count} out of {total_files} files')
|
||||
logging.info(
|
||||
f"processed {completed_count} out of {total_files} files"
|
||||
)
|
||||
except KeyboardInterrupt:
|
||||
logging.warning("KeyboardInterrupt received. Cancelling pending tasks...")
|
||||
logging.warning(
|
||||
"KeyboardInterrupt received. Cancelling pending tasks..."
|
||||
)
|
||||
executor.shutdown(wait=False, cancel_futures=True)
|
||||
raise
|
||||
|
||||
|
||||
@@ -1,16 +0,0 @@
|
||||
import logging
|
||||
|
||||
class CombineTimeseries:
|
||||
def __init__(self, config, locations):
|
||||
self.config = config
|
||||
self.locations = locations
|
||||
self.grouped_locations = {}
|
||||
self.build_location_groups()
|
||||
|
||||
def build_location_groups(self):
|
||||
for location in self.locations:
|
||||
group = location[3] # zone number
|
||||
if group not in self.grouped_locations:
|
||||
self.grouped_locations[group] = []
|
||||
self.grouped_locations[group].append(location)
|
||||
logging.info(f'Count of zones: {len(self.grouped_locations)}')
|
||||
@@ -8,10 +8,10 @@ import concurrent.futures
|
||||
import logging
|
||||
|
||||
|
||||
|
||||
class GenerateTimeseries:
|
||||
def __init__(self, config):
|
||||
def __init__(self, config, locations):
|
||||
self.config = config
|
||||
self.locations = locations
|
||||
|
||||
def _read_ascii_header(self, ascii_raster_file: str) -> list:
|
||||
"""Reads header information from an ASCII DEM
|
||||
@@ -76,7 +76,7 @@ class GenerateTimeseries:
|
||||
or None if processing fails.
|
||||
Format: [{'zone_id': id, 'date': datetime, 'value': float}, ...]
|
||||
"""
|
||||
if not file_name.endswith('.asc'):
|
||||
if not file_name.endswith(".asc"):
|
||||
return None
|
||||
|
||||
file_path = Path(self.config.ASC_TOP_FOLDER, file_name)
|
||||
@@ -112,18 +112,13 @@ class GenerateTimeseries:
|
||||
# print(f"Warning: Crop too small for {zone_id} in {file_name}")
|
||||
val = 0.0
|
||||
|
||||
results.append({
|
||||
'zone_id': zone_id,
|
||||
'date': parsed_date,
|
||||
'value': val
|
||||
})
|
||||
results.append({"zone_id": zone_id, "date": parsed_date, "value": val})
|
||||
|
||||
if self.config.delete_asc_after_processing:
|
||||
os.remove(file_path)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing file {file_name}: {e}")
|
||||
return None
|
||||
@@ -135,7 +130,7 @@ class GenerateTimeseries:
|
||||
locations (list): List of location data [zone_id, easting, northing, zone]
|
||||
"""
|
||||
# Initialize data structure to hold results: {zone_id: {'dates': [], 'values': []}}
|
||||
results = {loc[0]: {'dates': [], 'values': []} for loc in locations}
|
||||
results = {loc[0]: {"dates": [], "values": []} for loc in locations}
|
||||
|
||||
# Get list of ASC files
|
||||
asc_files = sorted(os.listdir(Path(self.config.ASC_TOP_FOLDER)))
|
||||
@@ -157,9 +152,9 @@ class GenerateTimeseries:
|
||||
file_results = future.result()
|
||||
if file_results:
|
||||
for res in file_results:
|
||||
zone_id = res['zone_id']
|
||||
results[zone_id]['dates'].append(res['date'])
|
||||
results[zone_id]['values'].append(res['value'])
|
||||
zone_id = res["zone_id"]
|
||||
results[zone_id]["dates"].append(res["date"])
|
||||
results[zone_id]["values"].append(res["value"])
|
||||
|
||||
completed_count += 1
|
||||
if completed_count % 100 == 0:
|
||||
@@ -176,9 +171,6 @@ class GenerateTimeseries:
|
||||
results (dict): Aggregated results {zone_id: {'dates': [], 'values': []}}
|
||||
locations (list): List of location data [zone_id, easting, northing, zone]
|
||||
"""
|
||||
# Map zone_id -> zone
|
||||
zone_map = {loc[0]: loc[3] for loc in locations}
|
||||
|
||||
# Group results by zone and collect all unique dates
|
||||
zone_data = {}
|
||||
for loc in locations:
|
||||
@@ -186,19 +178,19 @@ class GenerateTimeseries:
|
||||
zone_name = loc[3]
|
||||
|
||||
if zone_name not in zone_data:
|
||||
zone_data[zone_name] = {'dates': [], 'values': {}}
|
||||
zone_data[zone_name] = {"dates": [], "values": {}}
|
||||
|
||||
zone_data[zone_name]['values'][zone_id] = results[zone_id]['values']
|
||||
zone_data[zone_name]['dates'].extend(results[zone_id]['dates'])
|
||||
zone_data[zone_name]["values"][zone_id] = results[zone_id]["values"]
|
||||
zone_data[zone_name]["dates"].extend(results[zone_id]["dates"])
|
||||
|
||||
# Get unique sorted dates across all zones
|
||||
for zone_name, data in zone_data.items():
|
||||
data['dates'] = sorted(set(data['dates']))
|
||||
data["dates"] = sorted(set(data["dates"]))
|
||||
|
||||
# Now write one CSV per zone with aligned timestamps
|
||||
for zone_name, data in zone_data.items():
|
||||
dates = data['dates']
|
||||
values_dict = data['values']
|
||||
dates = data["dates"]
|
||||
values_dict = data["values"]
|
||||
|
||||
# Create aligned DataFrame
|
||||
df_dict = {"datetime": dates}
|
||||
@@ -235,7 +227,9 @@ class GenerateTimeseries:
|
||||
pd.col("datetime").dt.strftime("%Y-%m-%d %H:%M:%S")
|
||||
)
|
||||
|
||||
output_path = Path(self.config.COMBINED_FOLDER) / f"{zone_name}_timeseries_data.csv"
|
||||
output_path = (
|
||||
Path(self.config.COMBINED_FOLDER) / f"{zone_name}_timeseries_data.csv"
|
||||
)
|
||||
sorted_df.write_csv(output_path, float_precision=4)
|
||||
|
||||
logging.info("All CSV files written.")
|
||||
+5
-3
@@ -258,7 +258,7 @@ class Nimrod:
|
||||
# Read data as big-endian 16-bit integers
|
||||
# numpy.frombuffer is efficient for reading from bytes
|
||||
data_bytes = infile.read(array_size * 2)
|
||||
self.data = np.frombuffer(data_bytes, dtype='>h').astype(np.int16)
|
||||
self.data = np.frombuffer(data_bytes, dtype=">h").astype(np.int16)
|
||||
|
||||
# Reshape to (nrows, ncols) for easier 2D manipulation
|
||||
# Note: NIMROD data is row-major (C-style), starting from top-left
|
||||
@@ -392,7 +392,9 @@ class Nimrod:
|
||||
# Use numpy slicing to extract the sub-array
|
||||
# Note: y indices correspond to rows, x indices to columns
|
||||
# Slicing is [start:end], so we need +1 for the end index
|
||||
self.data = self.data[yMinPixelId : yMaxPixelId + 1, xMinPixelId : xMaxPixelId + 1]
|
||||
self.data = self.data[
|
||||
yMinPixelId : yMaxPixelId + 1, xMinPixelId : xMaxPixelId + 1
|
||||
]
|
||||
|
||||
# Update object where necessary
|
||||
self.x_right = self.x_left + xMaxPixelId * self.x_pixel_size
|
||||
@@ -435,7 +437,7 @@ class Nimrod:
|
||||
|
||||
# Write raster data to output file using numpy.savetxt
|
||||
# This is significantly faster than iterating in Python
|
||||
np.savetxt(outfile, self.data, fmt='%d', delimiter=' ')
|
||||
np.savetxt(outfile, self.data, fmt="%d", delimiter=" ")
|
||||
outfile.close()
|
||||
|
||||
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "met-office"
|
||||
version = "1.0.0"
|
||||
version = "1.1.0"
|
||||
description = "Convert .dat nimrod files to .asc files"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.14"
|
||||
|
||||
Reference in New Issue
Block a user