"""IFS TCO mesh loading.
This module provides functionality for loading IFS TCO meshes as xr.Dataset
objects with standardized variable names.
"""
from __future__ import annotations
import os
from pathlib import Path
from typing import TYPE_CHECKING
import numpy as np
import xarray as xr
from nereus.core.mesh import (
add_mesh_metadata,
normalize_lon,
should_use_dask,
)
if TYPE_CHECKING:
pass
[docs]
def load_mesh(
grid_file: str | os.PathLike,
area_file: str | os.PathLike,
*,
use_dask: bool | None = None,
) -> xr.Dataset:
"""Load IFS TCO mesh from grid and area files.
Parameters
----------
grid_file : str or Path
Path to the grid NetCDF file containing A*.lon and A*.lat.
area_file : str or Path
Path to the area NetCDF file containing A*.srf.
use_dask : bool, optional
Whether to use dask arrays. Auto-detects if None.
Returns
-------
xr.Dataset
Standardized mesh dataset with:
- lon, lat: Coordinates (npoints,)
- area: Cell area in m^2 (npoints,)
Plus original variables with their native names.
"""
grid_path = Path(grid_file)
area_path = Path(area_file)
if not grid_path.exists():
raise FileNotFoundError(f"Grid file not found: {grid_path}")
if not area_path.exists():
raise FileNotFoundError(f"Area file not found: {area_path}")
with xr.open_dataset(grid_path) as ds_grid_peek, xr.open_dataset(area_path) as ds_area_peek:
prefix = _select_prefix(ds_grid_peek, ds_area_peek)
lon_name = f"{prefix}.lon"
npoints = ds_grid_peek[lon_name].size
use_dask_actual = should_use_dask(npoints, use_dask)
if use_dask_actual:
ds_grid = xr.open_dataset(grid_path, chunks={})
ds_area = xr.open_dataset(area_path, chunks={})
else:
ds_grid = xr.open_dataset(grid_path)
ds_area = xr.open_dataset(area_path)
try:
lon_var = f"{prefix}.lon"
lat_var = f"{prefix}.lat"
area_var = f"{prefix}.srf"
lon_da = ds_grid[lon_var]
lat_da = ds_grid[lat_var]
area_da = ds_area[area_var]
if lon_da.dims != lat_da.dims:
raise ValueError(
f"lon/lat dimension mismatch: {lon_da.dims} vs {lat_da.dims}"
)
lon_data = lon_da.data if use_dask_actual else lon_da.values
lat_data = lat_da.data if use_dask_actual else lat_da.values
area_data = area_da.data if use_dask_actual else area_da.values
if use_dask_actual:
import dask.array as da
lon_data = da.map_blocks(
lambda x: normalize_lon(x, "pm180"),
lon_data,
dtype=np.float64,
)
lon_flat = da.ravel(lon_data)
lat_flat = da.ravel(lat_data)
area_flat = da.ravel(area_data)
else:
lon_flat = np.ravel(normalize_lon(lon_data, "pm180"))
lat_flat = np.ravel(lat_data)
area_flat = np.ravel(area_data)
ds = xr.Dataset(
{
"lon": (("npoints",), lon_flat, {
"units": "degrees_east",
"long_name": "Longitude",
"standard_name": "longitude",
}),
"lat": (("npoints",), lat_flat, {
"units": "degrees_north",
"long_name": "Latitude",
"standard_name": "latitude",
}),
"area": (("npoints",), area_flat, {
"units": "m2",
"long_name": "Cell area",
}),
},
attrs={
"ifs_tco_prefix": prefix,
"original_shape": lon_da.shape,
},
)
_copy_variables(ds, ds_grid, use_dask_actual, skip_existing=True)
_copy_variables(ds, ds_area, use_dask_actual, skip_existing=True)
finally:
if not use_dask_actual:
ds_grid.close()
ds_area.close()
ds.attrs["ifs_tco_grid_file"] = str(grid_path)
ds.attrs["ifs_tco_area_file"] = str(area_path)
return add_mesh_metadata(ds, "ifs_tco", grid_path, use_dask=use_dask_actual)
def _select_prefix(ds_grid: xr.Dataset, ds_area: xr.Dataset) -> str:
"""Select A* prefix present in grid and area files."""
lon_prefixes = _collect_prefixes(ds_grid, "lon")
lat_prefixes = _collect_prefixes(ds_grid, "lat")
area_prefixes = _collect_prefixes(ds_area, "srf")
candidates = lon_prefixes & lat_prefixes & area_prefixes
if not candidates:
raise ValueError(
"Could not find matching A* prefix for lon/lat and area. "
f"lon prefixes: {sorted(lon_prefixes)}, "
f"lat prefixes: {sorted(lat_prefixes)}, "
f"area prefixes: {sorted(area_prefixes)}"
)
return sorted(candidates)[0]
def _collect_prefixes(ds: xr.Dataset, suffix: str) -> set[str]:
"""Collect prefixes for variables matching A*.suffix."""
prefixes: set[str] = set()
for name in ds.variables:
if not name.startswith("A"):
continue
if not name.endswith(f".{suffix}"):
continue
prefix = name.split(".", 1)[0]
if prefix:
prefixes.add(prefix)
return prefixes
def _copy_variables(
dest: xr.Dataset,
src: xr.Dataset,
use_dask: bool,
*,
skip_existing: bool = False,
) -> None:
"""Copy variables and coordinates from source dataset to destination."""
for name in src.data_vars:
if skip_existing and name in dest:
continue
var = src[name]
data = var.data if use_dask else var.values
dest[name] = xr.DataArray(data, dims=var.dims, attrs=var.attrs)
for name in src.coords:
if skip_existing and (name in dest or name in dest.coords):
continue
coord = src[name]
data = coord.data if use_dask else coord.values
dims = coord.dims if coord.dims else ()
dest.coords[name] = xr.DataArray(data, dims=dims, attrs=coord.attrs)