chore: 🔧 More cleaning

This commit is contained in:
2025-11-11 11:54:28 +00:00
parent 71af242dcb
commit b7d0f6cd99
6 changed files with 130 additions and 171 deletions
+4 -34
View File
@@ -1,36 +1,6 @@
import yaml
import logging
class Config:
def __init__(self) -> None:
self.IN_TOP_FOLDER = "./dat_files"
self.OUT_TOP_FOLDER = "./asc_files"
self.CSV_TOP_FOLER = "./csv_files"
self.AREAS_FILE = 'areas.csv'
def load_areas(self) -> dict:
"""
Load configuration from YAML file.
Returns:
dict: Configuration dictionary containing bounding box information.
Raises:
FileNotFoundError: If the config.yaml file is not found.
yaml.YAMLError: If there's an error parsing the YAML file.
"""
try:
with open(, "r") as file:
config = yaml.safe_load(file)
return config.get("bounding_box_info", {})
except FileNotFoundError:
logging.error(
f"Config file {CONFIG_FILE} not found. Using default configuration."
)
return {}
except yaml.YAMLError as e:
logging.error(f"Error parsing YAML file: {e}")
return {}
DAT_TOP_FOLDER = "./dat_files"
ASC_TOP_FOLDER = "./asc_files"
CSV_TOP_FOLDER = "./csv_files"
AREAS_FILE = 'areas.csv'
-3
View File
@@ -1,3 +0,0 @@
IN_TOP_FOLDER: "./dat_files"
OUT_TOP_FOLDER: "./asc_files"
CSV_TOP_FOLER: "./csv_files"
+38 -36
View File
@@ -1,46 +1,48 @@
import logging
import yaml
import time
import os
from pathlib import Path
CONFIG_FILE = "config.yaml"
from config import Config
from modules import BatchNimrod, GenerateTimeseries
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
def load_config() -> dict:
"""
Load configuration from YAML file.
if __name__ == "__main__":
os.makedirs(Path(Config.ASC_TOP_FOLDER), exist_ok=True)
os.makedirs(Path(Config.CSV_TOP_FOLDER), exist_ok=True)
dat_file_count = [f for f in os.listdir(Path(Config.DAT_TOP_FOLDER))]
asc_file_count = [f for f in os.listdir(Path(Config.ASC_TOP_FOLDER))]
Returns:
dict: Configuration dictionary containing bounding box information.
locations = [
# loc name, loc id, x loc, y loc, resolution
["BRICSC", "TM0816", 608500, 216500, 1000],
["HEACSC", "TF6842", 568500, 342500, 1000],
]
Raises:
FileNotFoundError: If the config.yaml file is not found.
yaml.YAMLError: If there's an error parsing the YAML file.
"""
try:
with open(CONFIG_FILE, "r") as file:
config = yaml.safe_load(file)
return config.get("bounding_box_info", {})
except FileNotFoundError:
logging.error(
f"Config file {CONFIG_FILE} not found. Using default configuration."
)
return {}
except yaml.YAMLError as e:
logging.error(f"Error parsing YAML file: {e}")
return {}
batch = BatchNimrod(Config)
timeseries = GenerateTimeseries(Config)
start = time.time()
logging.info("Starting to process DAT to ASC")
if dat_file_count != asc_file_count:
batch.process_nimrod_files()
batch_checkpoint = time.time()
elapsed_time = batch_checkpoint - start
logging.info(f"DAT to ASC completed in {elapsed_time:.2f} seconds")
else:
logging.info("No need to process DAT files, skipping...")
time.sleep(1)
for place in locations:
logging.info(f'{place[0]} started generating timeseries data.')
timeseries.extract_cropped_rain_data(place)
place_checkpoint = time.time()
since_asc_create = place_checkpoint - batch_checkpoint
elapsed_time = place_checkpoint - start
logging.info(f"{place[0]} completed in {since_asc_create:.2f} seconds")
logging.info(f'total time so far {elapsed_time:.2f} seconds')
os.makedirs(Path(OUT_TOP_FOLDER), exist_ok=True)
os.makedirs(Path(CSV_TOP_FOLDER), exist_ok=True)
# if __name__ == "__main__":
# start = time.time()
# process_nimrod_files()
# end = time.time()
# elapsed_time = end - start
# logging.info(f"Processing completed in {elapsed_time:.2f} seconds")
logging.info(f'All Complete')
+2 -1
View File
@@ -1,2 +1,3 @@
from .nimrod import Nimrod
from .batch_nimrod import process_nimrod_files
from .batch_nimrod import BatchNimrod
from .generate_timeseries import GenerateTimeseries
+5 -5
View File
@@ -13,22 +13,22 @@ class BatchNimrod():
Process all Nimrod files in the input directory, applying bounding box clipping
and exporting to ASC format.
This function reads all files from IN_TOP_FOLDER, applies the appropriate bounding
This function reads all files from DAT_TOP_FOLDER, applies the appropriate bounding
box for each area, and exports clipped raster data to OUT_TOP_FOLDER.
"""
# Read all file names in the folder
files_to_process = [f for f in os.listdir(Path(self.config.IN_TOP_FOLDER))]
files_to_process = [f for f in os.listdir(Path(self.config.DAT_TOP_FOLDER))]
logging.info(f"Processing {len(files_to_process)} files...")
for in_file in os.listdir(Path(self.config.IN_TOP_FOLDER)):
in_file_full = Path(self.config.IN_TOP_FOLDER, in_file)
for in_file in os.listdir(Path(self.config.DAT_TOP_FOLDER)):
in_file_full = Path(self.config.DAT_TOP_FOLDER, in_file)
try:
image = Nimrod(open(in_file_full, "rb"))
out_file_name = f"{image.get_validity_time()}.asc"
out_file_path = Path(self.config.OUT_TOP_FOLDER, out_file_name)
out_file_path = Path(self.config.ASC_TOP_FOLDER, out_file_name)
with open(out_file_path, "w") as outfile:
image.extract_asc(outfile)
+81 -92
View File
@@ -4,124 +4,113 @@ import glob
import pandas as pd
from datetime import datetime
# Configuration
asc_path = "asc_files/"
asc_wildcard_file = "*.asc"
asc_mult_source = asc_path + asc_wildcard_file
def read_ascii_header(ascii_raster_file: str) -> list:
"""Reads header information from an ASCII DEM
class GenerateTimeseries:
def __init__(self, config):
self.config = config
Args:
ascii_raster_file (str): Path to the ASCII raster file
def _read_ascii_header(self, ascii_raster_file: str) -> list:
"""Reads header information from an ASCII DEM
Returns:
list: Header data as a list of floats
"""
with open(ascii_raster_file) as f:
header_data = [float(f.__next__().split()[1]) for x in range(6)]
return header_data
Args:
ascii_raster_file (str): Path to the ASCII raster file
Returns:
list: Header data as a list of floats
"""
with open(ascii_raster_file) as f:
header_data = [float(f.__next__().split()[1]) for x in range(6)]
return header_data
def calculate_crop_coords(basin_header: list, radar_header: list) -> tuple:
"""Calculate crop coordinates based on header data
def _calculate_crop_coords(self, basin_header: list, radar_header: list) -> tuple:
"""Calculate crop coordinates based on header data
Args:
basin_header (list): Basin header data
radar_header (list): Radar header data
Args:
basin_header (list): Basin header data
radar_header (list): Radar header data
Returns:
tuple: (start_col, start_row, end_col, end_row) as integers
"""
y0_radar = radar_header[3]
x0_radar = radar_header[2]
Returns:
tuple: (start_col, start_row, end_col, end_row) as integers
"""
y0_radar = radar_header[3]
x0_radar = radar_header[2]
y0_basin = basin_header[3]
x0_basin = basin_header[2]
y0_basin = basin_header[3]
x0_basin = basin_header[2]
nrows_radar = radar_header[1]
nrows_radar = radar_header[1]
nrows_basin = 2 # hardcoded, we always expect 2 rows
ncols_basin = 2 # hardcoded, we always expect 2 columns
nrows_basin = 2 # hardcoded, likely to change?
ncols_basin = 2 # hardcoded, likely to change?
cellres_radar = radar_header[4]
cellres_basin = basin_header[4]
cellres_radar = radar_header[4]
cellres_basin = basin_header[4]
xp = x0_basin - x0_radar
yp = y0_basin - y0_radar
xp = x0_basin - x0_radar
yp = y0_basin - y0_radar
xpp = ncols_basin * cellres_basin
ypp = nrows_basin * cellres_basin
xpp = ncols_basin * cellres_basin
ypp = nrows_basin * cellres_basin
start_col = np.floor(xp / cellres_radar)
end_col = np.ceil((xpp + xp) / cellres_radar)
start_col = np.floor(xp / cellres_radar)
end_col = np.ceil((xpp + xp) / cellres_radar)
start_row = np.floor(nrows_radar - ((yp + ypp) / cellres_radar))
end_row = np.ceil(nrows_radar - (yp / cellres_radar))
start_row = np.floor(nrows_radar - ((yp + ypp) / cellres_radar))
end_row = np.ceil(nrows_radar - (yp / cellres_radar))
#print(start_col, start_row, end_col, end_row)
return int(start_col), int(start_row), int(end_col), int(end_row)
#print(start_col, start_row, end_col, end_row)
return int(start_col), int(start_row), int(end_col), int(end_row)
def extract_cropped_rain_data(location):
"""Extract cropped rain data and create rainfall timeseries
def extract_cropped_rain_data(self, location):
"""Extract cropped rain data and create rainfall timeseries
Returns:
None
"""
rainfile = []
Returns:
None
"""
rainfile = []
datetime_list = []
# Create datetime list
datetime_list = []
print(location)
for f in glob.iglob(asc_mult_source):
# print(f)
radar_header = read_ascii_header(f)
start_col, start_row, end_col, end_row = calculate_crop_coords(
location, radar_header
)
for f in glob.iglob(f'{self.config.ASC_TOP_FOLDER}/*.asc'):
# print(f)
radar_header = self._read_ascii_header(f)
start_col, start_row, end_col, end_row = self._calculate_crop_coords(
location, radar_header
)
start_col = int(round(start_col))
start_row = int(round(start_row))
end_col = int(round(end_col))
end_row = int(round(end_row))
start_col = int(round(start_col))
start_row = int(round(start_row))
end_col = int(round(end_col))
end_row = int(round(end_row))
cur_rawgrid = np.genfromtxt(
f, skip_header=6, filling_values=0.0, loose=True, invalid_raise=False
)
cur_rawgrid = np.genfromtxt(
f, skip_header=6, filling_values=0.0, loose=True, invalid_raise=False
)
cur_croppedrain = cur_rawgrid[start_row:end_row, start_col:end_col]
# Flatten the cropped rain data into a 1D array
cur_rainrow = cur_croppedrain.flatten()
rainfile.append(cur_rainrow)
cur_croppedrain = cur_rawgrid[start_row:end_row, start_col:end_col]
# Flatten the cropped rain data into a 1D array
cur_rainrow = cur_croppedrain.flatten()
rainfile.append(cur_rainrow[2]/32)
# Extract datetime from filename
filename = f.split("/")[-1] # Get just the filename
# 20240929 0015
date_str = filename[:8] # YYYYMMDD
time_str = filename[8:12] # HHMM
# Extract datetime from filename
filename = f.split("/")[-1] # Get just the filename
date_str = filename[:8] # YYYYMMDD
time_str = filename[8:12] # HHMM
# Parse datetime
parsed_date = datetime.strptime(f"{date_str}{time_str}", "%Y%m%d%H%M")
datetime_list.append(parsed_date)
# Parse datetime
parsed_date = datetime.strptime(f"{date_str}{time_str}", "%Y%m%d%H%M")
datetime_list.append(parsed_date)
rainfile_arr = np.vstack(rainfile)
rainfile_arr = np.vstack(rainfile)
# Create DataFrame with datetime index
df = pd.DataFrame(rainfile_arr, index=datetime_list)
# sort the dataframe into date order
sorted_df = df.sort_index()
# add headers
header_row = ['rainfall_1', 'rainfall_2', 'rainfall_3', 'rainfall_4']
file_name = f"csv_files/{location[0]}_timeseries_data.csv"
sorted_df.to_csv(file_name, sep=",", float_format="%1.4f", header=header_row, index_label='datetime')
# Create DataFrame with datetime index
df = pd.DataFrame(rainfile_arr, index=datetime_list)
# sort the dataframe into date order
sorted_df = df.sort_index()
# add headers
header_row = [location[1]]
file_name = f"csv_files/{location[0]}_timeseries_data.csv"
sorted_df.to_csv(file_name, sep=",", float_format="%1.4f", header=header_row, index_label='datetime')
if __name__ == "__main__":
locations = [
# loc name, loc id, x loc, y loc, resolution
["BRICSC", "TM0816", 608500, 216500, 1000],
["HEACSC", "TF6842", 568500, 342500, 1000],
]
for place in locations:
extract_cropped_rain_data(place)