# validation.py
"""
Tools for validating outputs and producing accuracy assessment metrics.
License: The code in this notebook is licensed under the Apache License,
Version 2.0 (https://www.apache.org/licenses/LICENSE-2.0). Digital Earth
Australia data is licensed under the Creative Commons by Attribution 4.0
license (https://creativecommons.org/licenses/by/4.0/).
Contact: If you need assistance, please post a question on the Open Data
Cube Discord chat (https://discord.com/invite/4hhBQVas5U) or on the GIS Stack
Exchange (https://gis.stackexchange.com/questions/ask?tags=open-data-cube)
using the `open-data-cube` tag (you can view previously asked questions
here: https://gis.stackexchange.com/questions/tagged/open-data-cube).
If you would like to report an issue with this script, you can file one
on GitHub (https://github.com/GeoscienceAustralia/dea-notebooks/issues/new).
Last modified: July 2025
"""
from math import sqrt
import geopandas as gpd
import numpy as np
import pandas as pd
import xarray as xr
from scipy import stats
from sklearn.metrics import mean_absolute_error, mean_squared_error
from .spatial import add_geobox
[docs]
def eval_metrics(x, y, round=3, all_regress=False):
"""
Calculate a set of common statistical metrics
based on two input actual and predicted vectors.
These include:
- Pearson correlation
- Root Mean Squared Error
- Mean Absolute Error
- R-squared
- Bias
- Linear regression parameters (slope,
p-value, intercept, standard error)
Parameters
----------
x : numpy.array
An array providing "actual" variable values
y : numpy.array
An array providing "predicted" variable values
round : int
Number of decimal places to round each metric
to. Defaults to 3
all_regress : bool
Whether to return linear regression p-value,
intercept and standard error (in addition to
only regression slope). Defaults to False
Returns
-------
A pandas.Series containing calculated metrics
"""
# Create dataframe to drop na
xy_df = pd.DataFrame({"x": x, "y": y}).dropna()
# Compute linear regression
lin_reg = stats.linregress(x=xy_df.x, y=xy_df.y)
# Calculate statistics
stats_dict = {
"Correlation": xy_df.corr().iloc[0, 1],
"RMSE": sqrt(mean_squared_error(xy_df.x, xy_df.y)),
"MAE": mean_absolute_error(xy_df.x, xy_df.y),
"R-squared": lin_reg.rvalue**2,
"Bias": (xy_df.y - xy_df.x).mean(),
"Regression slope": lin_reg.slope,
}
# Additional regression params
if all_regress:
stats_dict.update({
"Regression p-value": lin_reg.pvalue,
"Regression intercept": lin_reg.intercept,
"Regression standard error": lin_reg.stderr,
})
# Return as
return pd.Series(stats_dict).round(round)
[docs]
def xr_random_sampling(
da,
n=None,
sampling="stratified_random",
manual_class_ratios=None,
oversample_factor=5,
out_fname=None,
verbose=True,
):
"""
Efficient and scalable random sampling of a 2D classified xarray.DataArray.
Returns a GeoDataFrame of point samples based on specified sampling strategy.
Parameters
----------
da : xarray.DataArray
A classified 2-dimensional xarray.DataArray
n : int
Total number of points to sample. Ignored if providing
a dictionary of {class:numofpoints} to 'manual_class_ratios'
sampling : str, optional
The sampling strategy to use. Options include:
'stratified_random' = Create points that are randomly
distributed within each class, where each class has a
number of points proportional to its relative area.
'equal_stratified_random' = Create points that are randomly
distributed within each class, where each class has the
same number of points.
'random' = Create points that are randomly distributed
throughout the image.
'manual' = user definined, each class is allocated a
specified number of points, supply a manual_class_ratio
dictionary mapping number of points to each class
manual_class_ratios : dict, optional
If setting sampling to 'manual', the provide a dictionary
of type {'class': numofpoints} mapping the number of points
to generate for each class.
oversample_factor : float, optional (default=5)
A multiplier used to increase the number of random candidate pixels
initially drawn when sampling very large classes (>1 billion pixels).
For such large classes, the function randomly samples a subset of
pixel coordinates and checks which ones match the target class.
To reduce the chance of undersampling, `oversample_factor` controls
how many candidate coordinates are initially drawn.
For example, if 100 samples are required and `oversample_factor=5`,
500 random (x, y) coordinates will be sampled first. Only those matching
the class will be retained and then randomly subsampled down to the desired
number of samples. If too few valid matches are found, a warning is issued.
Increasing this value can improve success rates when sampling sparse or
spatially fragmented classes in large datasets, at the cost of more memory
and computation.
out_fname : str, optional
If providing a filepath name, e.g 'sample_points.geojson', the
function will export a geojson (or shapefile) of the sampling
points to file.
verbose: bool, optional (default=True)
If True, print statements will track progress and print warnings
Returns
-------
geopandas.GeoDataFrame
"""
# perform checks on the inputs
if sampling not in [
"stratified_random",
"equal_stratified_random",
"random",
"manual",
]:
raise ValueError(
"Sampling strategy must be one of 'stratified_random', 'equal_stratified_random', 'random', or 'manual'"
)
if "time" in da.dims:
raise ValueError("Input DataArray must not have a 'time' dimension.")
if len(da.dims) > 2:
raise ValueError("Input DataArray must not have more than two dimensions")
if not isinstance(da, xr.DataArray):
raise ValueError("This function only accepts xarray.DataArrays as input")
# Ensure da has a .odc.* accessor using odc.geo.
da = add_geobox(da)
# Obtain spatial dim names
y_dim, x_dim = da.odc.spatial_dims
# grab data as numpy arrays and count classes
data = da.values
unique_classes, class_counts = np.unique(data[~np.isnan(data)], return_counts=True)
unique_classes = unique_classes.astype(int)
# store our samples in a list
samples = []
if sampling == "random":
# first check num of samples doesn't exceed pixels
total_valid = (~np.isnan(data)).sum()
if n > total_valid:
raise ValueError("Requested more samples than available valid pixels.")
if verbose:
print(f"Sampling {n} points")
# determine flat indices of the non-Nans
flat_indices = np.flatnonzero(~np.isnan(data))
# sample the flat indices
sampled = np.random.choice(flat_indices, size=n, replace=False)
# get coords and class values from sample indices
for idx in sampled:
y, x = np.unravel_index(idx, data.shape)
y_val = da[y_dim].values[y]
x_val = da[x_dim].values[x]
cls = data[y, x]
samples.append((y_val, x_val, int(cls)))
elif sampling in ["stratified_random", "equal_stratified_random", "manual"]:
if sampling == "equal_stratified_random":
# divide n by the number of classes
n_per_class = int(np.ceil(n / len(unique_classes)))
class_sample_sizes = dict.fromkeys(unique_classes, n_per_class)
elif sampling == "stratified_random":
# calculate relative proportions of classes.
proportions = class_counts / class_counts.sum()
class_sample_sizes = {cls: int(np.round(n * prop)) for cls, prop in zip(unique_classes, proportions)}
elif sampling == "manual":
if not isinstance(manual_class_ratios, dict):
raise ValueError("Must provide manual_class_ratios for manual sampling.")
class_sample_sizes = {int(k): int(v) for k, v in manual_class_ratios.items()}
for cls in class_sample_sizes:
sample_size = class_sample_sizes[cls]
if verbose:
print(f"Class {cls}: sampling {sample_size} points")
class_count = (data == cls).sum()
if class_count > 1e9: # For v. large classes, sample random coords first and check matches
# Try oversampling until we get enough
n_try = int(sample_size * oversample_factor)
rand_x = np.random.choice(np.arange(len(da.x)), n_try, replace=False)
rand_y = np.random.choice(np.arange(len(da.y)), n_try, replace=False)
# find matches with class id
match = data[rand_y, rand_x] == cls
rand_y, rand_x = rand_y[match], rand_x[match]
# check if matches is less than requested sample size
# and return samples with a warning
if len(rand_y) < sample_size:
if verbose:
print(
f"Warning: insufficient matches for class {cls}, "
f"try increasing oversampling. Returning {len(rand_y)} matches"
)
idx = np.random.choice(np.arange(len(rand_y)), size=len(rand_y), replace=False)
for i in idx:
y = da[y_dim].values[rand_y[i]]
x = da[x_dim].values[rand_x[i]]
samples.append((y, x, cls))
else:
# If more matches than samples, then randomly sample the matches so we get the
# the right number of samples.
idx = np.random.choice(np.arange(len(rand_y)), size=sample_size, replace=False)
for i in idx:
y = da[y_dim].values[rand_y[i]]
x = da[x_dim].values[rand_x[i]]
samples.append((y, x, cls))
else:
# if class size is less than a billion, then sample class mask
class_mask = data == cls
flat_indices = np.flatnonzero(class_mask)
# Check if enough pixels exist
if flat_indices.size < sample_size:
if verbose:
print(f"Warning: not enough pixels in class {cls} for given sample size, skipping")
continue
# Randomly sample from those flat indices
sampled = np.random.choice(flat_indices, size=sample_size, replace=False)
# Convert flat indices to (y, x), then to coordinates
for idx in sampled:
y_idx, x_idx = np.unravel_index(idx, data.shape)
y = da[y_dim].values[y_idx]
x = da[x_dim].values[x_idx]
samples.append((y, x, cls))
if len(samples) == 0:
raise RuntimeError("No samples collected. Check input conditions.")
# Add samples to geodataframe
df = pd.DataFrame(samples, columns=["y", "x", "class"])
gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.x, df.y), crs=f"EPSG:{da.odc.crs.epsg}")
gdf = gdf.drop(["x", "y"], axis=1)
if out_fname:
gdf.to_file(out_fname)
return gdf