# classification.py
"""
Machine learning classification tools for analysing remote sensing data
using the Open Data Cube.
License: The code in this notebook is licensed under the Apache License,
Version 2.0 (https://www.apache.org/licenses/LICENSE-2.0). Digital Earth
Australia data is licensed under the Creative Commons by Attribution 4.0
license (https://creativecommons.org/licenses/by/4.0/).
Contact: If you need assistance, please post a question on the Open Data
Cube Discord chat (https://discord.com/invite/4hhBQVas5U) or on the GIS Stack
Exchange (https://gis.stackexchange.com/questions/ask?tags=open-data-cube)
using the `open-data-cube` tag (you can view previously asked questions
here: https://gis.stackexchange.com/questions/tagged/open-data-cube).
If you would like to report an issue with this script, you can file one
on GitHub (https://github.com/GeoscienceAustralia/dea-notebooks/issues/new).
Last modified: May 2021
"""
import os
import sys
import joblib
import numpy as np
import pandas as pd
import xarray as xr
import time
import warnings
from datetime import timedelta
import geopandas as gpd
from copy import deepcopy
from tqdm.auto import tqdm
import multiprocessing as mp
import matplotlib.pyplot as plt
from typing import Callable, Tuple, Any, Optional, List, Dict
import dask.array as da
from dask_ml.wrappers import ParallelPostFit
import dask.distributed as dd
from dask.diagnostics import ProgressBar
from sklearn.cluster import KMeans
from sklearn.utils import check_random_state
from abc import ABCMeta, abstractmethod
from sklearn.base import ClusterMixin
from sklearn.mixture import GaussianMixture
from sklearn.cluster import AgglomerativeClustering
from sklearn.model_selection import KFold, ShuffleSplit
from sklearn.model_selection import BaseCrossValidator
from datacube.utils.geometry import assign_crs
from datacube.utils import geometry
from dea_tools.spatial import xr_rasterize
[docs]
def sklearn_flatten(input_xr):
"""
Reshape a DataArray or Dataset with spatial (and optionally
temporal) structure into an np.array with the spatial and temporal
dimensions flattened into one dimension.
This flattening procedure enables DataArrays and Datasets to be used
to train and predict
with sklearn models.
Last modified: September 2019
Parameters
----------
input_xr : xarray.DataArray or xarray.Dataset
Must have dimensions 'x' and 'y', may have dimension 'time'.
Dimensions other than 'x', 'y' and 'time' are unaffected by the
flattening.
Returns
----------
input_np : numpy.array
A numpy array corresponding to input_xr.data (or
input_xr.to_array().data), with dimensions 'x','y' and 'time'
flattened into a single dimension, which is the first axis of
the returned array. input_np contains no NaNs.
"""
# cast input Datasets to DataArray
if isinstance(input_xr, xr.Dataset):
input_xr = input_xr.to_array()
# stack across pixel dimensions, handling timeseries if necessary
if "time" in input_xr.dims:
stacked = input_xr.stack(z=["x", "y", "time"])
else:
stacked = input_xr.stack(z=["x", "y"])
# finding 'bands' dimensions in each pixel - these will not be
# flattened as their context is important for sklearn
pxdims = []
for dim in stacked.dims:
if dim != "z":
pxdims.append(dim)
# mask NaNs - we mask pixels with NaNs in *any* band, because
# sklearn cannot accept NaNs as input
mask = np.isnan(stacked)
if len(pxdims) != 0:
mask = mask.any(dim=pxdims)
# turn the mask into a numpy array (boolean indexing with xarrays
# acts weird)
mask = mask.data
# the dimension we are masking along ('z') needs to be the first
# dimension in the underlying np array for the boolean indexing to work
stacked = stacked.transpose("z", *pxdims)
input_np = stacked.data[~mask]
return input_np
[docs]
def sklearn_unflatten(output_np, input_xr):
"""
Reshape a numpy array with no 'missing' elements (NaNs) and
'flattened' spatiotemporal structure into a DataArray matching the
spatiotemporal structure of the DataArray
This enables an sklearn model's prediction to be remapped to the
correct pixels in the input DataArray or Dataset.
Last modified: September 2019
Parameters
----------
output_np : numpy.array
The first dimension's length should correspond to the number of
valid (non-NaN) pixels in input_xr.
input_xr : xarray.DataArray or xarray.Dataset
Must have dimensions 'x' and 'y', may have dimension 'time'.
Dimensions other than 'x', 'y' and 'time' are unaffected by the
flattening.
Returns
----------
output_xr : xarray.DataArray
An xarray.DataArray with the same dimensions 'x', 'y' and 'time'
as input_xr, and the same valid (non-NaN) pixels. These pixels
are set to match the data in output_np.
"""
# the output of a sklearn model prediction should just be a numpy array
# with size matching x*y*time for the input DataArray/Dataset.
# cast input Datasets to DataArray
if isinstance(input_xr, xr.Dataset):
input_xr = input_xr.to_array()
# generate the same mask we used to create the input to the sklearn model
if "time" in input_xr.dims:
stacked = input_xr.stack(z=["x", "y", "time"])
else:
stacked = input_xr.stack(z=["x", "y"])
pxdims = []
for dim in stacked.dims:
if dim != "z":
pxdims.append(dim)
mask = np.isnan(stacked)
if len(pxdims) != 0:
mask = mask.any(dim=pxdims)
# handle multivariable output
output_px_shape = ()
if len(output_np.shape[1:]):
output_px_shape = output_np.shape[1:]
# use the mask to put the data in all the right places
output_ma = np.ma.empty((len(stacked.z), *output_px_shape))
output_ma[~mask] = output_np
output_ma[mask] = np.ma.masked
# set the stacked coordinate to match the input
output_xr = xr.DataArray(
output_ma,
coords={"z": stacked["z"]},
dims=[
"z",
*["output_dim_" + str(idx) for idx in range(len(output_px_shape))],
],
)
output_xr = output_xr.unstack()
return output_xr
[docs]
def fit_xr(model, input_xr):
"""
Utilise our wrappers to fit a vanilla sklearn model.
Last modified: September 2019
Parameters
----------
model : scikit-learn model or compatible object
Must have a fit() method that takes numpy arrays.
input_xr : xarray.DataArray or xarray.Dataset.
Must have dimensions 'x' and 'y', may have dimension 'time'.
Returns
----------
model : a scikit-learn model which has been fitted to the data in
the pixels of input_xr.
"""
model = model.fit(sklearn_flatten(input_xr))
return model
[docs]
def predict_xr(
model,
input_xr,
chunk_size=None,
persist=False,
proba=False,
max_proba=True,
clean=False,
return_input=False,
):
"""
Using dask-ml ParallelPostfit(), runs the parallel
predict and predict_proba methods of sklearn
estimators. Useful for running predictions
on a larger-than-RAM datasets.
Last modified: September 2020
Parameters
----------
model : scikit-learn model or compatible object
Must have a .predict() method that takes numpy arrays.
input_xr : xarray.DataArray or xarray.Dataset.
Must have dimensions 'x' and 'y'
chunk_size : int
The dask chunk size to use on the flattened array. If this
is left as None, then the chunks size is inferred from the
.chunks method on the `input_xr`
persist : bool
If True, and proba=True, then 'input_xr' data will be
loaded into distributed memory. This will ensure data
is not loaded twice for the prediction of probabilities,
but this will only work if the data is not larger than
distributed RAM.
proba : bool
If True, predict probabilities
max_proba : bool
If True, the probabilities array will be flattened to contain
only the probabiltiy for the "Predictions" class. If False,
the "Probabilities" object will be an array of prediction
probaiblities for each classes
clean : bool
If True, remove Infs and NaNs from input and output arrays
return_input : bool
If True, then the data variables in the 'input_xr' dataset will
be appended to the output xarray dataset.
Returns
----------
output_xr : xarray.Dataset
An xarray.Dataset containing the prediction output from model.
if proba=True then dataset will also contain probabilites, and
if return_input=True then dataset will have the input feature layers.
Has the same spatiotemporal structure as input_xr.
"""
# if input_xr isn't dask, coerce it
dask = True
if not bool(input_xr.chunks):
dask = False
input_xr = input_xr.chunk({"x": len(input_xr.x), "y": len(input_xr.y)})
# set chunk size if not supplied
if chunk_size is None:
chunk_size = int(input_xr.chunks["x"][0]) * int(
input_xr.chunks["y"][0]
)
def _predict_func(model, input_xr, persist, proba, max_proba, clean, return_input):
x, y, crs = input_xr.x, input_xr.y, input_xr.geobox.crs
input_data = []
for var_name in input_xr.data_vars:
input_data.append(input_xr[var_name])
input_data_flattened = []
for arr in input_data:
data = arr.data.flatten().rechunk(chunk_size)
input_data_flattened.append(data)
# reshape for prediction
input_data_flattened = da.array(input_data_flattened).transpose()
if clean == True:
input_data_flattened = da.where(
da.isfinite(input_data_flattened), input_data_flattened, 0
)
if (proba == True) & (persist == True):
# persisting data so we don't require loading all the data twice
input_data_flattened = input_data_flattened.persist()
# apply the classification
print("predicting...")
out_class = model.predict(input_data_flattened)
# Mask out NaN or Inf values in results
if clean == True:
out_class = da.where(da.isfinite(out_class), out_class, 0)
# Reshape when writing out
out_class = out_class.reshape(len(y), len(x))
# stack back into xarray
output_xr = xr.DataArray(
out_class, coords={"x": x, "y": y}, dims=["y", "x"]
)
output_xr = output_xr.to_dataset(name="Predictions")
if proba == True:
print(" probabilities...")
out_proba = model.predict_proba(input_data_flattened)
# return either one band with the max probability, or the whole probability array
if max_proba == True:
print(" returning single probability band.")
out_proba = da.max(out_proba, axis=1) * 100.0
out_proba = out_proba.reshape(len(y), len(x))
out_proba = xr.DataArray(
out_proba, coords={"x": x, "y": y}, dims=["y", "x"]
)
output_xr["Probabilities"] = out_proba
else:
print(" returning class probability array.")
out_proba = out_proba * 100.0
class_names = model.classes_ # Get the unique class names from the fitted classifier
# Loop through each class (band)
probabilities_dataset = xr.Dataset()
for i, class_name in enumerate(class_names):
reshaped_band = out_proba[:, i].reshape(len(y), len(x))
reshaped_da = xr.DataArray(
reshaped_band, coords={"x": x, "y": y}, dims=["y", "x"]
)
probabilities_dataset[f"prob_{class_name}"] = reshaped_da
# merge in the probabilities
output_xr = xr.merge([output_xr, probabilities_dataset])
if clean == True:
out_proba = da.where(da.isfinite(out_proba), out_proba, 0)
if return_input == True:
print(" input features...")
# unflatten the input_data_flattened array and append
# to the output_xr containin the predictions
arr = input_xr.to_array()
stacked = arr.stack(z=["y", "x"])
# handle multivariable output
output_px_shape = ()
if len(input_data_flattened.shape[1:]):
output_px_shape = input_data_flattened.shape[1:]
output_features = input_data_flattened.reshape(
(len(stacked.z), *output_px_shape)
)
# set the stacked coordinate to match the input
output_features = xr.DataArray(
output_features,
coords={"z": stacked["z"]},
dims=[
"z",
*[
"output_dim_" + str(idx)
for idx in range(len(output_px_shape))
],
],
).unstack()
# convert to dataset and rename arrays
output_features = output_features.to_dataset(dim="output_dim_0")
data_vars = list(input_xr.data_vars)
output_features = output_features.rename(
{i: j for i, j in zip(output_features.data_vars, data_vars)}
)
# merge with predictions
output_xr = xr.merge(
[output_xr, output_features], compat="override"
)
return assign_crs(output_xr, str(crs))
if dask == True:
# convert model to dask predict
model = ParallelPostFit(model)
with joblib.parallel_backend("dask"):
output_xr = _predict_func(
model, input_xr, persist, proba, max_proba, clean, return_input
)
else:
output_xr = _predict_func(
model, input_xr, persist, proba, max_proba, clean, return_input
).compute()
return output_xr
[docs]
class HiddenPrints:
"""
For concealing unwanted print statements called by other functions
"""
def __enter__(self):
self._original_stdout = sys.stdout
sys.stdout = open(os.devnull, "w")
def __exit__(self, exc_type, exc_val, exc_tb):
sys.stdout.close()
sys.stdout = self._original_stdout
def _get_training_data_for_shp(
gdf: gpd.GeoDataFrame,
index: int,
row: gpd.GeoSeries,
out_arrs: List[np.ndarray],
out_vars: List[List[str]],
dc_query: Dict,
return_coords: bool,
feature_func: Optional[callable] = None,
field: Optional[str] = None,
zonal_stats: Optional[str] = None,
time_field: Optional[str] = None,
time_delta: Optional[timedelta] = None,
):
"""
This is the core function that is triggered by `collect_training_data`.
The `collect_training_data` function loops through geometries in a geopandas
geodataframe and runs the code within `_get_training_data_for_shp`.
Parameters are inherited from `collect_training_data`.
See that function for information on the other params not listed below.
Parameters
----------
gdf : gpd.GeoDataFrame
Geopandas GeoDataFrame containing geometries.
index : int
Index of the current geometry in the GeoDataFrame.
row : gpd.GeoSeries
GeoSeries representing the current row in the GeoDataFrame.
out_arrs : List[np.ndarray]
An empty list into which the training data arrays are stored.
out_vars : List[List[str]]
An empty list into which the data variable names are stored.
dc_query : Dict
ODC query.
return_coords : bool
Flag indicating whether to return coordinates in the dataset.
feature_func : callable, optional
Optional function to extract data based on `dc_query`. Defaults to None.
field : str, optional
Name of the class field. Defaults to None.
zonal_stats : str, optional
Zonal statistics method. Defaults to None.
time_field : str, optional
Name of the column containing timestamp data in the input gdf. Defaults to None.
time_delta : timedelta, optional
Time delta used to match a data point with all the scenes falling between
`time_stamp - time_delta` and `time_stamp + time_delta`. Defaults to None.
Returns
--------
Two lists, a list of numpy.arrays containing classes and extracted data for
each pixel or polygon, and another containing the data variable names.
"""
# prevent function altering dictionary kwargs
dc_query = deepcopy(dc_query)
# remove dask chunks if supplied as using
# mulitprocessing for parallization
if "dask_chunks" in dc_query.keys():
dc_query.pop("dask_chunks", None)
# set up query based on polygon
geom = geometry.Geometry(geom=gdf.iloc[index].geometry, crs=gdf.crs)
q = {"geopolygon": geom}
# merge polygon query with user supplied query params
dc_query.update(q)
# Update time range if a time window is specified
if time_delta is not None:
timestamp = gdf.loc[index][time_field]
start_time = timestamp - time_delta
end_time = timestamp + time_delta
timestamp = {"time": (start_time, end_time)}
# merge time query with user supplied query params
dc_query.update(timestamp)
# Use input feature function
data = feature_func(dc_query)
# if no data is present then return
if len(data) == 0:
return
if gdf.iloc[[index]].geometry.geom_type.values != "Point":
# If the geometry type is a polygon extract all pixels
# create polygon mask
mask = xr_rasterize(gdf.iloc[[index]], data)
data = data.where(mask)
# Check that feature_func has removed time
if "time" in data.dims:
t = data.dims["time"]
if t > 1 and time_delta is not None:
raise ValueError(
"After running the feature_func, the dataset still has "
+ str(t)
+ " time-steps, dataset must only have"
+ " x and y dimensions."
)
if return_coords == True:
# turn coords into a variable in the ds
data["x_coord"] = data.x + 0 * data.y
data["y_coord"] = data.y + 0 * data.x
# append ID measurement to dataset for tracking failures
band = [m for m in data.data_vars][0]
_id = xr.zeros_like(data[band])
data["id"] = _id
data["id"] = data["id"] + gdf.iloc[index]["id"]
# If no zonal stats were requested then extract all pixel values
if zonal_stats is None:
flat_train = sklearn_flatten(data)
flat_val = np.repeat(row[field], flat_train.shape[0])
stacked = np.hstack((np.expand_dims(flat_val, axis=1), flat_train))
elif zonal_stats in ["mean", "median", "max", "min"]:
method_to_call = getattr(data, zonal_stats)
flat_train = method_to_call()
flat_train = flat_train.to_array()
stacked = np.hstack((row[field], flat_train))
else:
raise Exception(
zonal_stats
+ " is not one of the supported"
+ " reduce functions ('mean','median','max','min')"
)
out_arrs.append(stacked)
out_vars.append([field] + list(data.data_vars))
def _get_training_data_parallel(
gdf: gpd.GeoDataFrame,
dc_query: str,
ncpus: int,
return_coords: bool,
feature_func: Optional[Callable] = None,
field: Optional[str] = None,
zonal_stats: Optional[str] = None,
time_field: Optional[str] = None,
time_delta: Optional[int] = None,
) -> Tuple[List[str], List[Any]]:
"""
Function passing the '_get_training_data_for_shp' function
to a mulitprocessing.Pool.
Inherits variables from 'collect_training_data()'.
"""
# Check if dask-client is running
try:
zx = None
zx = dd.get_client()
except:
pass
if zx is not None:
raise ValueError(
"You have a Dask Client running, which prevents \n"
"this function from multiprocessing. Close the client."
)
# instantiate lists that can be shared across processes
manager = mp.Manager()
results = manager.list()
column_names = manager.list()
# progress bar
pbar = tqdm(total=len(gdf))
def update(*a):
pbar.update()
with mp.Pool(ncpus) as pool:
for index, row in gdf.iterrows():
pool.apply_async(
_get_training_data_for_shp,
[
gdf,
index,
row,
results,
column_names,
dc_query,
return_coords,
feature_func,
field,
zonal_stats,
time_field,
time_delta,
],
callback=update,
)
pool.close()
pool.join()
pbar.close()
return column_names, results
[docs]
def collect_training_data(
gdf: gpd.GeoDataFrame,
dc_query: dict,
ncpus: int = 1,
return_coords: bool = False,
feature_func: callable = None,
field: str = None,
zonal_stats: str = None,
clean: bool = True,
fail_threshold: float = 0.02,
fail_ratio: float = 0.5,
max_retries: int = 3,
time_field: str = None,
time_delta: timedelta = None,
) -> Tuple[List[np.ndarray], List[str]]:
"""
This function provides methods for gathering training data from the ODC over
geometries stored within a geopandas geodataframe. The function will return a
'model_input' array containing stacked training data arrays with all NaNs & Infs removed.
In the instance where ncpus > 1, a parallel version of the function will be run
(functions are passed to a mp.Pool()). This function can conduct zonal statistics if
the supplied shapefile contains polygons. The 'feature_func' parameter defines what
features to produce.
Parameters
----------
gdf : geopandas geodataframe
geometry data in the form of a geopandas geodataframe
dc_query : dictionary
Datacube query object, should not contain lat and long (x or y)
variables as these are supplied by the 'gdf' variable
ncpus : int
The number of cpus/processes over which to parallelize the gathering
of training data (only if ncpus is > 1). Use 'mp.cpu_count()' to determine the number of
cpus available on a machine. Defaults to 1.
return_coords : bool
If True, then the training data will contain two extra columns 'x_coord' and
'y_coord' corresponding to the x,y coordinate of each sample. This variable can
be useful for handling spatial autocorrelation between samples later in the ML workflow.
feature_func : function
A function for generating feature layers that is applied to the data within
the bounds of the input geometry. The 'feature_func' must accept a 'dc_query'
object, and return a single xarray.Dataset or xarray.DataArray containing
2D coordinates (i.e x, y - no time dimension).
e.g.
def feature_function(query):
dc = datacube.Datacube(app='feature_layers')
ds = dc.load(**query)
ds = ds.mean('time')
return ds
field : str
Name of the column in the gdf that contains the class labels
zonal_stats : string, optional
An optional string giving the names of zonal statistics to calculate
for each polygon. Default is None (all pixel values are returned). Supported
values are 'mean', 'median', 'max', 'min'.
clean : bool
Whether or not to remove missing values in the training dataset. If True,
training labels with any NaNs or Infs in the feature layers will be dropped
from the dataset.
fail_threshold : float, default 0.02
Silent read fails on S3 can result in some rows of the returned data containing NaN values.
The'fail_threshold' fraction specifies a % of acceptable fails.
e.g. Setting 'fail_threshold' to 0.05 means if >5% of the samples in the training dataset
fail then those samples will be reutnred to the multiprocessing queue. Below this fraction
the function will accept the failures and return the results.
fail_ratio: float
A float between 0 and 1 that defines if a given training sample has failed.
Default is 0.5, which means if 50 % of the measurements in a given sample return null
values, and the number of total fails is more than the fail_threshold, the samplewill be
passed to the retry queue.
max_retries: int, default 3
Maximum number of times to retry collecting samples. This number is invoked
if the 'fail_threshold' is not reached.
time_field: str
The name of the attribute in the input dataframe containing capture timestamp
time_delta: time_delta
The size of the window used as timestamp +/- time_delta.
This is used to allow matching a single field data point with multiple scenes
Returns
--------
Two lists, a list of numpy.arrays containing classes and extracted data for
each pixel or polygon, and another containing the data variable names.
"""
# check the dtype of the class field
if gdf[field].dtype != int:
raise ValueError(
'The "field" column of the input vector must contain integer dtypes'
)
# check for feature_func
if feature_func is None:
raise ValueError(
"Please supply a feature layer function through the "
+ "parameter 'feature_func'"
)
if zonal_stats is not None:
print("Taking zonal statistic: " + zonal_stats)
# add unique id to gdf to help with indexing failed rows
# during multiprocessing
# if zonal_stats is not None:
gdf["id"] = range(0, len(gdf))
if ncpus == 1:
# progress indicator
print("Collecting training data in serial mode")
i = 0
# list to store results
results = []
column_names = []
# loop through polys and extract training data
for index, row in gdf.iterrows():
print(" Feature {:04}/{:04}\r".format(i + 1, len(gdf)), end="")
_get_training_data_for_shp(
gdf,
index,
row,
results,
column_names,
dc_query,
return_coords,
feature_func,
field,
zonal_stats,
time_field,
time_delta,
)
i += 1
else:
print("Collecting training data in parallel mode")
column_names, results = _get_training_data_parallel(
gdf=gdf,
dc_query=dc_query,
ncpus=ncpus,
return_coords=return_coords,
feature_func=feature_func,
field=field,
zonal_stats=zonal_stats,
time_field=time_field,
time_delta=time_delta,
)
# column names are appended during each iteration
# but they are identical, grab only the first instance
column_names = column_names[0]
# Stack the extracted training data for each feature into a single array
model_input = np.vstack(results)
# this code block below iteratively retries failed rows
# up to max_retries or until fail_threshold is
# reached - whichever occurs first
if ncpus > 1:
i = 1
while i <= max_retries:
# Find % of fails (null values) in data. Use Pandas for simplicity
df = pd.DataFrame(
data=model_input[:, 0:-1], index=model_input[:, -1]
)
# how many nan values per id?
num_nans = df.isnull().sum(axis=1)
num_nans = num_nans.groupby(num_nans.index).sum()
# how many valid values per id?
num_valid = df.notnull().sum(axis=1)
num_valid = num_valid.groupby(num_valid.index).sum()
# find fail rate
perc_fail = num_nans / (num_nans + num_valid)
fail_ids = perc_fail[perc_fail > fail_ratio]
fail_rate = len(fail_ids) / len(gdf)
print(
"Percentage of possible fails after run "
+ str(i)
+ " = "
+ str(round(fail_rate * 100, 2))
+ " %"
)
if fail_rate > fail_threshold:
print("Recollecting samples that failed")
fail_ids = list(fail_ids.index)
# keep only the ids in model_input object that didn't fail
model_input = model_input[
~np.isin(model_input[:, -1], fail_ids)
]
# index out the fail_ids from the original gdf
gdf_rerun = gdf.loc[gdf["id"].isin(fail_ids)]
gdf_rerun = gdf_rerun.reset_index(drop=True)
time.sleep(5) # sleep for 5s to rest api
# recollect failed rows
(
column_names_again,
results_again,
) = _get_training_data_parallel(
gdf=gdf_rerun,
dc_query=dc_query,
ncpus=ncpus,
return_coords=return_coords,
feature_func=feature_func,
field=field,
zonal_stats=zonal_stats,
time_field=time_field,
time_delta=time_delta,
)
# Stack the extracted training data for each feature into a single array
model_input_again = np.vstack(results_again)
# merge results of the re-run with original run
model_input = np.vstack((model_input, model_input_again))
i += 1
else:
break
# -----------------------------------------------
# remove id column
idx_var = column_names[0:-1]
model_col_indices = [column_names.index(var_name) for var_name in idx_var]
model_input = model_input[:, model_col_indices]
if clean == True:
num = np.count_nonzero(np.isnan(model_input).any(axis=1))
model_input = model_input[~np.isnan(model_input).any(axis=1)]
model_input = model_input[~np.isinf(model_input).any(axis=1)]
print("Removed " + str(num) + " rows wth NaNs &/or Infs")
print("Output shape: ", model_input.shape)
else:
print("Returning data without cleaning")
print("Output shape: ", model_input.shape)
return column_names[0:-1], model_input
[docs]
class KMeans_tree(ClusterMixin):
"""
A hierarchical KMeans unsupervised clustering model. This class is
a clustering model, so it inherits scikit-learn's ClusterMixin
base class.
Parameters
----------
n_levels : integer, default 2
number of levels in the tree of clustering models.
n_clusters : integer, default 3
Number of clusters in each of the constituent KMeans models in
the tree.
**kwargs : optional
Other keyword arguments to be passed directly to the KMeans
initialiser.
"""
def __init__(self, n_levels=2, n_clusters=3, **kwargs):
assert n_levels >= 1
self.base_model = KMeans(n_clusters=3, **kwargs)
self.n_levels = n_levels
self.n_clusters = n_clusters
# make child models
if n_levels > 1:
self.branches = [
KMeans_tree(
n_levels=n_levels - 1, n_clusters=n_clusters, **kwargs
)
for _ in range(n_clusters)
]
[docs]
def fit(self, X, y=None, sample_weight=None):
"""
Fit the tree of KMeans models. All parameters mimic those
of KMeans.fit().
Parameters
----------
X : array-like or sparse matrix, shape=(n_samples, n_features)
Training instances to cluster. It must be noted that the
data will be converted to C ordering, which will cause a
memory copy if the given data is not C-contiguous.
y : Ignored
not used, present here for API consistency by convention.
sample_weight : array-like, shape (n_samples,), optional
The weights for each observation in X. If None, all
observations are assigned equal weight (default: None)
"""
self.labels_ = self.base_model.fit(
X, sample_weight=sample_weight
).labels_
if self.n_levels > 1:
labels_old = np.copy(self.labels_)
# make room to add the sub-cluster labels
self.labels_ *= (self.n_clusters) ** (self.n_levels - 1)
for clu in range(self.n_clusters):
# fit child models on their corresponding partition of the training set
self.branches[clu].fit(
X[labels_old == clu],
sample_weight=(
sample_weight[labels_old == clu]
if sample_weight is not None
else None
),
)
self.labels_[labels_old == clu] += self.branches[clu].labels_
return self
[docs]
def predict(self, X, sample_weight=None):
"""
Send X through the KMeans tree and predict the resultant
cluster. Compatible with KMeans.predict().
Parameters
----------
X : {array-like, sparse matrix}, shape = [n_samples, n_features]
New data to predict.
sample_weight : array-like, shape (n_samples,), optional
The weights for each observation in X. If None, all
observations are assigned equal weight (default: None)
Returns
-------
labels : array, shape [n_samples,]
Index of the cluster each sample belongs to.
"""
result = self.base_model.predict(X, sample_weight=sample_weight)
if self.n_levels > 1:
rescpy = np.copy(result)
# make room to add the sub-cluster labels
result *= (self.n_clusters) ** (self.n_levels - 1)
for clu in range(self.n_clusters):
result[rescpy == clu] += self.branches[clu].predict(
X[rescpy == clu],
sample_weight=(
sample_weight[rescpy == clu]
if sample_weight is not None
else None
),
)
return result
[docs]
def spatial_clusters(
coordinates,
method="Hierarchical",
max_distance=None,
n_groups=None,
verbose=False,
**kwargs
):
"""
Create spatial groups on coorindate data using either KMeans clustering
or a Gaussian Mixture model
Last modified: September 2020
Parameters
----------
n_groups : int
The number of groups to create. This is passed as 'n_clusters=n_groups'
for the KMeans algo, and 'n_components=n_groups' for the GMM. If using
method='Hierarchical' then this paramter is ignored.
coordinates : np.array
A numpy array of coordinate values e.g.
np.array([[3337270., 262400.],
[3441390., -273060.], ...])
method : str
Which algorithm to use to seperate data points. Either 'KMeans', 'GMM', or
'Hierarchical'. If using 'Hierarchical' then must set max_distance.
max_distance : int
If method is set to 'hierarchical' then maximum distance describes the
maximum euclidean distances between all observations in a cluster. 'n_groups'
is ignored in this case.
**kwargs : optional,
Additional keyword arguments to pass to sklearn.cluster.Kmeans or
sklearn.mixture.GuassianMixture depending on the 'method' argument.
Returns
-------
labels : array, shape [n_samples,]
Index of the cluster each sample belongs to.
"""
if method not in ["Hierarchical", "KMeans", "GMM"]:
raise ValueError(
"Method must be one of: 'Hierarchical','KMeans' or 'GMM'"
)
if (method in ["GMM", "KMeans"]) & (n_groups is None):
raise ValueError(
"The 'GMM' and 'KMeans' methods requires explicitly setting 'n_groups'"
)
if (method == "Hierarchical") & (max_distance is None):
raise ValueError(
"The 'Hierarchical' method requires setting max_distance"
)
if method == "Hierarchical":
cluster_label = AgglomerativeClustering(
n_clusters=None,
linkage="complete",
distance_threshold=max_distance,
**kwargs
).fit_predict(coordinates)
if method == "KMeans":
cluster_label = KMeans(n_clusters=n_groups, **kwargs).fit_predict(
coordinates
)
if method == "GMM":
cluster_label = GaussianMixture(
n_components=n_groups, **kwargs
).fit_predict(coordinates)
if verbose:
print("n clusters = " + str(len(np.unique(cluster_label))))
return cluster_label
[docs]
def SKCV(
coordinates,
n_splits,
cluster_method,
kfold_method,
test_size,
balance,
n_groups=None,
max_distance=None,
train_size=None,
random_state=None,
**kwargs
):
"""
Generate spatial k-fold cross validation indices using coordinate data.
This function wraps the 'SpatialShuffleSplit' and 'SpatialKFold' classes.
These classes ingest coordinate data in the form of an
np.array([[Eastings, northings]]) and assign samples to a spatial cluster
using either a KMeans, Gaussain Mixture, or Agglomerative Clustering algorithm.
This cross-validator is preferred over other sklearn.model_selection methods
for spatial data to avoid overestimating cross-validation scores.
This can happen because of the inherent spatial autocorrelation that is usually
associated with this type of data.
Last modified: Dec 2020
Parameters
----------
coordinates : np.array
A numpy array of coordinate values e.g.
np.array([[3337270., 262400.],
[3441390., -273060.], ...])
n_splits : int
The number of test-train cross validation splits to generate.
cluster_method : str
Which algorithm to use to seperate data points. Either 'KMeans', 'GMM', or
'Hierarchical'
kfold_method : str
One of either 'SpatialShuffleSplit' or 'SpatialKFold'. See the docs
under class:_SpatialShuffleSplit and class: _SpatialKFold for more
information on these options.
test_size : float, int, None
If float, should be between 0.0 and 1.0 and represent the proportion
of the dataset to include in the test split. If int, represents the
absolute number of test samples. If None, the value is set to the
complement of the train size. If ``train_size`` is also None, it will
be set to 0.15.
balance : int or bool
if setting kfold_method to 'SpatialShuffleSplit': int
The number of splits generated per iteration to try to balance the
amount of data in each set so that *test_size* and *train_size* are
respected. If 1, then no extra splits are generated (essentially
disabling the balacing). Must be >= 1.
if setting kfold_method to 'SpatialKFold': bool
Whether or not to split clusters into fold with approximately equal
number of data points. If False, each fold will have the same number of
clusters (which can have different number of data points in them).
n_groups : int
The number of groups to create. This is passed as 'n_clusters=n_groups'
for the KMeans algo, and 'n_components=n_groups' for the GMM. If using
cluster_method='Hierarchical' then this parameter is ignored.
max_distance : int
If method is set to 'hierarchical' then maximum distance describes the
maximum euclidean distances between all observations in a cluster. 'n_groups'
is ignored in this case.
train_size : float, int, or None
If float, should be between 0.0 and 1.0 and represent the
proportion of the dataset to include in the train split. If
int, represents the absolute number of train samples. If None,
the value is automatically set to the complement of the test size.
random_state : int, RandomState instance or None, optional (default=None)
If int, random_state is the seed used by the random number generator;
If RandomState instance, random_state is the random number generator;
If None, the random number generator is the RandomState instance used
by `np.random`.
**kwargs : optional,
Additional keyword arguments to pass to sklearn.cluster.Kmeans or
sklearn.mixture.GuassianMixture depending on the cluster_method argument.
Returns
--------
generator object _BaseSpatialCrossValidator.split
"""
# intiate a method
if kfold_method == "SpatialShuffleSplit":
splitter = _SpatialShuffleSplit(
n_groups=n_groups,
method=cluster_method,
coordinates=coordinates,
max_distance=max_distance,
test_size=test_size,
train_size=train_size,
n_splits=n_splits,
random_state=random_state,
balance=balance,
**kwargs
)
if kfold_method == "SpatialKFold":
splitter = _SpatialKFold(
n_groups=n_groups,
coordinates=coordinates,
max_distance=max_distance,
method=cluster_method,
test_size=test_size,
n_splits=n_splits,
random_state=random_state,
balance=balance,
**kwargs
)
return splitter
[docs]
def spatial_train_test_split(
X,
y,
coordinates,
cluster_method,
kfold_method,
balance,
test_size=None,
n_splits=None,
n_groups=None,
max_distance=None,
train_size=None,
random_state=None,
**kwargs
):
"""
Split arrays into random train and test subsets. Similar to
`sklearn.model_selection.train_test_split` but instead works on
spatial coordinate data. Coordinate data is grouped according
to either a KMeans, Gaussain Mixture, or Agglomerative Clustering algorthim.
Grouping by spatial clusters is preferred over plain random splits for
spatial data to avoid overestimating validation scores due to spatial
autocorrelation.
Parameters
----------
X : np.array
Training data features
y : np.array
Training data labels
coordinates : np.array
A numpy array of coordinate values e.g.
np.array([[3337270., 262400.],
[3441390., -273060.], ...])
cluster_method : str
Which algorithm to use to seperate data points. Either 'KMeans', 'GMM', or
'Hierarchical'
kfold_method : str
One of either 'SpatialShuffleSplit' or 'SpatialKFold'. See the docs
under class:_SpatialShuffleSplit and class: _SpatialKFold for more
information on these options.
balance : int or bool
if setting kfold_method to 'SpatialShuffleSplit': int
The number of splits generated per iteration to try to balance the
amount of data in each set so that *test_size* and *train_size* are
respected. If 1, then no extra splits are generated (essentially
disabling the balacing). Must be >= 1.
if setting kfold_method to 'SpatialKFold': bool
Whether or not to split clusters into fold with approximately equal
number of data points. If False, each fold will have the same number of
clusters (which can have different number of data points in them).
test_size : float, int, None
If float, should be between 0.0 and 1.0 and represent the proportion
of the dataset to include in the test split. If int, represents the
absolute number of test samples. If None, the value is set to the
complement of the train size. If ``train_size`` is also None, it will
be set to 0.15.
n_splits : int
This parameter is invoked for the 'SpatialKFold' folding method, use this
number to satisfy the train-test size ratio desired, as the 'test_size'
parameter for the KFold method often fails to get the ratio right.
n_groups : int
The number of groups to create. This is passed as 'n_clusters=n_groups'
for the KMeans algo, and 'n_components=n_groups' for the GMM. If using
cluster_method='Hierarchical' then this parameter is ignored.
max_distance : int
If method is set to 'hierarchical' then maximum distance describes the
maximum euclidean distances between all observations in a cluster. 'n_groups'
is ignored in this case.
train_size : float, int, or None
If float, should be between 0.0 and 1.0 and represent the
proportion of the dataset to include in the train split. If
int, represents the absolute number of train samples. If None,
the value is automatically set to the complement of the test size.
random_state : int,
RandomState instance or None, optional
If int, random_state is the seed used by the random number generator;
If RandomState instance, random_state is the random number generator;
If None, the random number generator is the RandomState instance used
by `np.random`.
**kwargs : optional,
Additional keyword arguments to pass to sklearn.cluster.Kmeans or
sklearn.mixture.GuassianMixture depending on the cluster_method argument.
Returns
-------
Tuple :
Contains four arrays in the following order:
X_train, X_test, y_train, y_test
"""
if kfold_method == "SpatialShuffleSplit":
splitter = _SpatialShuffleSplit(
n_groups=n_groups,
method=cluster_method,
coordinates=coordinates,
max_distance=max_distance,
test_size=test_size,
train_size=train_size,
n_splits=1 if n_splits is None else n_splits,
random_state=random_state,
balance=balance,
**kwargs
)
if kfold_method == "SpatialKFold":
if n_splits is None:
raise ValueError(
"n_splits parameter requires an integer value, eg. 'n_splits=5'"
)
if (test_size is not None) or (train_size is not None):
warnings.warn(
"With the 'SpatialKFold' method, controlling the test/train ratio "
"is better achieved using the 'n_splits' parameter"
)
splitter = _SpatialKFold(
n_groups=n_groups,
coordinates=coordinates,
max_distance=max_distance,
method=cluster_method,
n_splits=n_splits,
random_state=random_state,
balance=balance,
**kwargs
)
lst = []
for train, test in splitter.split(coordinates):
X_tr, X_tt = X[train, :], X[test, :]
y_tr, y_tt = y[train], y[test]
lst.extend([X_tr, X_tt, y_tr, y_tt])
return (lst[0], lst[1], lst[2], lst[3])
def _partition_by_sum(array, parts):
"""
Partition an array into parts of approximately equal sum.
Does not change the order of the array elements.
Produces the partition indices on the array. Use :func:`numpy.split` to
divide the array along these indices.
Parameters
----------
array : array or array-like
The 1D array that will be partitioned. The array will be raveled before
computations.
parts : int
Number of parts to split the array. Can be at most the number of
elements in the array.
Returns
-------
indices : array
The indices in which the array should be split.
Notes
-----
Solution from https://stackoverflow.com/a/54024280
"""
array = np.atleast_1d(array).ravel()
if parts > array.size:
raise ValueError(
"Cannot partition an array of size {} into {} parts of equal sum.".format(
array.size, parts
)
)
cumulative_sum = array.cumsum()
# Ideally, we want each part to have the same number of points (total /
# parts).
ideal_sum = cumulative_sum[-1] // parts
# If the parts are ideal, the cumulative sum of each part will be this
ideal_cumsum = np.arange(1, parts) * ideal_sum
indices = np.searchsorted(cumulative_sum, ideal_cumsum, side="right")
# Check for repeated split points, which indicates that there is no way to
# split the array.
if np.unique(indices).size != indices.size:
raise ValueError(
"Could not find partition points to split the array into {} parts "
"of equal sum.".format(parts)
)
return indices
class _BaseSpatialCrossValidator(BaseCrossValidator, metaclass=ABCMeta):
"""
Base class for spatial cross-validators.
Parameters
----------
n_groups : int
The number of groups to create. This is passed as 'n_clusters=n_groups'
for the KMeans algo, and 'n_components=n_groups' for the GMM.
coordinates : np.array
A numpy array of coordinate values e.g.
np.array([[3337270., 262400.],
[3441390., -273060.], ...,
method : str
Which algorithm to use to seperate data points. Either 'KMeans' or 'GMM'
n_splits : int
Number of splitting iterations.
"""
def __init__(
self,
n_groups=None,
coordinates=None,
method=None,
max_distance=None,
n_splits=None,
):
self.n_groups = n_groups
self.coordinates = coordinates
self.method = method
self.max_distance = max_distance
self.n_splits = n_splits
def split(self, X, y=None, groups=None):
"""
Generate indices to split data into training and test set.
Parameters
----------
X : array-like, shape (n_samples, 2)
Columns should be the easting and northing coordinates of data
points, respectively.
y : array-like, shape (n_samples,)
The target variable for supervised learning problems. Always
ignored.
groups : array-like, with shape (n_samples,), optional
Group labels for the samples used while splitting the dataset into
train/test set. Always ignored.
Yields
------
train : ndarray
The training set indices for that split.
test : ndarray
The testing set indices for that split.
"""
if X.shape[1] != 2:
raise ValueError(
"X (the coordinate data) must have exactly 2 columns ({} given).".format(
X.shape[1]
)
)
for train, test in super().split(X, y, groups):
yield train, test
def get_n_splits(self, X=None, y=None, groups=None):
"""
Returns the number of splitting iterations in the cross-validator
Parameters
----------
X : object
Always ignored, exists for compatibility.
y : object
Always ignored, exists for compatibility.
groups : object
Always ignored, exists for compatibility.
Returns
-------
n_splits : int
Returns the number of splitting iterations in the cross-validator.
"""
return self.n_splits
@abstractmethod
def _iter_test_indices(self, X=None, y=None, groups=None):
"""
Generates integer indices corresponding to test sets.
MUST BE IMPLEMENTED BY DERIVED CLASSES.
Parameters
----------
X : array-like, shape (n_samples, 2)
Columns should be the easting and northing coordinates of data
points, respectively.
y : array-like, shape (n_samples,)
The target variable for supervised learning problems. Always
ignored.
groups : array-like, with shape (n_samples,), optional
Group labels for the samples used while splitting the dataset into
train/test set. Always ignored.
Yields
------
test : ndarray
The testing set indices for that split.
"""
class _SpatialShuffleSplit(_BaseSpatialCrossValidator):
"""
Random permutation of spatial cross-validator.
Yields indices to split data into training and test sets. Data are first
grouped into clusters using either a KMeans or GMM algorithm
and are then split into testing and training sets randomly.
The proportion of clusters assigned to each set is controlled by *test_size*
and/or *train_size*. However, the total amount of actual data points in
each set could be different from these values since clusters can have
a different number of data points inside them. To guarantee that the
proportion of actual data is as close as possible to the proportion of
clusters, this cross-validator generates an extra number of splits and
selects the one with proportion of data points in each set closer to the
desired amount. The number of balance splits per
iteration is controlled by the *balance* argument.
This cross-validator is preferred over `sklearn.model_selection.ShuffleSplit`
for spatial data to avoid overestimating cross-validation scores.
This can happen because of the inherent spatial autocorrelation.
Parameters
----------
n_groups : int
The number of groups to create. This is passed as 'n_clusters=n_groups'
for the KMeans algo, and 'n_components=n_groups' for the GMM. If using
cluster_method='Hierarchical' then this parameter is ignored.
coordinates : np.array
A numpy array of coordinate values e.g.
np.array([[3337270., 262400.],
[3441390., -273060.], ...])
cluster_method : str
Which algorithm to use to seperate data points. Either 'KMeans', 'GMM', or
'Hierarchical'
max_distance : int
If method is set to 'hierarchical' then maximum distance describes the
maximum euclidean distances between all observations in a cluster. 'n_groups'
is ignored in this case.
n_splits : int,
Number of re-shuffling & splitting iterations.
test_size : float, int, None
If float, should be between 0.0 and 1.0 and represent the proportion
of the dataset to include in the test split. If int, represents the
absolute number of test samples. If None, the value is set to the
complement of the train size. If ``train_size`` is also None, it will
be set to 0.1.
train_size : float, int, or None
If float, should be between 0.0 and 1.0 and represent the
proportion of the dataset to include in the train split. If
int, represents the absolute number of train samples. If None,
the value is automatically set to the complement of the test size.
random_state : int, RandomState instance or None, optional (default=None)
If int, random_state is the seed used by the random number generator;
If RandomState instance, random_state is the random number generator;
If None, the random number generator is the RandomState instance used
by `np.random`.
balance : int
The number of splits generated per iteration to try to balance the
amount of data in each set so that *test_size* and *train_size* are
respected. If 1, then no extra splits are generated (essentially
disabling the balacing). Must be >= 1.
**kwargs : optional,
Additional keyword arguments to pass to sklearn.cluster.Kmeans or
sklearn.mixture.GuassianMixture depending on the cluster_method argument.
Returns
--------
generator
containing indices to split data into training and test sets
"""
def __init__(
self,
n_groups=None,
coordinates=None,
method="Heirachical",
max_distance=None,
n_splits=None,
test_size=0.15,
train_size=None,
random_state=None,
balance=10,
**kwargs
):
super().__init__(
n_groups=n_groups,
coordinates=coordinates,
method=method,
max_distance=max_distance,
n_splits=n_splits,
**kwargs
)
if balance < 1:
raise ValueError(
"The *balance* argument must be >= 1. To disable balance, use 1."
)
self.test_size = test_size
self.train_size = train_size
self.random_state = random_state
self.balance = balance
self.kwargs = kwargs
def _iter_test_indices(self, X=None, y=None, groups=None):
"""
Generates integer indices corresponding to test sets.
Runs several iterations until a split is found that yields clusters with
the right amount of data points in it.
Parameters
----------
X : array-like, shape (n_samples, 2)
Columns should be the easting and northing coordinates of data
points, respectively.
y : array-like, shape (n_samples,)
The target variable for supervised learning problems. Always
ignored.
groups : array-like, with shape (n_samples,), optional
Group labels for the samples used while splitting the dataset into
train/test set. Always ignored.
Yields
------
test : ndarray
The testing set indices for that split.
"""
labels = spatial_clusters(
n_groups=self.n_groups,
coordinates=self.coordinates,
method=self.method,
max_distance=self.max_distance,
**self.kwargs
)
cluster_ids = np.unique(labels)
# Generate many more splits so that we can pick and choose the ones
# that have the right balance of training and testing data.
shuffle = ShuffleSplit(
n_splits=self.n_splits * self.balance,
test_size=self.test_size,
train_size=self.train_size,
random_state=self.random_state,
).split(cluster_ids)
for _ in range(self.n_splits):
test_sets, balance = [], []
for _ in range(self.balance):
# This is a false positive in pylint which is why the warning
# is disabled at the top of this file:
# https://github.com/PyCQA/pylint/issues/1830
# pylint: disable=stop-iteration-return
train_clusters, test_clusters = next(shuffle)
# pylint: enable=stop-iteration-return
train_points = np.where(
np.isin(labels, cluster_ids[train_clusters])
)[0]
test_points = np.where(
np.isin(labels, cluster_ids[test_clusters])
)[0]
# The proportion of data points assigned to each group should
# be close the proportion of clusters assigned to each group.
balance.append(
abs(
train_points.size / test_points.size
- train_clusters.size / test_clusters.size
)
)
test_sets.append(test_points)
best = np.argmin(balance)
yield test_sets[best]
class _SpatialKFold(_BaseSpatialCrossValidator):
"""
Spatial K-Folds cross-validator.
Yields indices to split data into training and test sets. Data are first
grouped into clusters using either a KMeans or GMM algorithm
clusters. The clusters are then split into testing and training sets iteratively
along k folds of the data (k is given by *n_splits*).
By default, the clusters are split into folds in a way that makes each fold
have approximately the same number of data points. Sometimes this might not
be possible, which can happen if the number of splits is close to the
number of clusters. In these cases, each fold will have the same number of
clusters regardless of how many data points are in each cluster. This
behaviour can also be disabled by setting ``balance=False``.
This cross-validator is preferred over `sklearn.model_selection.KFold` for
spatial data to avoid overestimating cross-validation scores. This can happen
because of the inherent autocorrelation that is usually associated with
this type of data.
Parameters
----------
n_groups : int
The number of groups to create. This is passed as 'n_clusters=n_groups'
for the KMeans algo, and 'n_components=n_groups' for the GMM. If using
cluster_method='Hierarchical' then this parameter is ignored.
coordinates : np.array
A numpy array of coordinate values e.g.
np.array([[3337270., 262400.],
[3441390., -273060.], ...])
cluster_method : str
Which algorithm to use to seperate data points. Either 'KMeans', 'GMM', or
'Hierarchical'
max_distance : int
If method is set to 'hierarchical' then maximum distance describes the
maximum euclidean distances between all observations in a cluster. 'n_groups'
is ignored in this case.
n_splits : int
Number of folds. Must be at least 2.
shuffle : bool
Whether to shuffle the data before splitting into batches.
random_state : int, RandomState instance or None, optional (defasult=None)
If int, random_state is the seed used by the random number generator;
If RandomState instance, random_state is the random number generator;
If None, the random number generator is the RandomState instance used
by `np.random`.
balance : bool
Whether or not to split clusters into fold with approximately equal
number of data points. If False, each fold will have the same number of
clusters (which can have different number of data points in them).
**kwargs : optional,
Additional keyword arguments to pass to sklearn.cluster.Kmeans or
sklearn.mixture.GuassianMixture depending on the cluster_method argument.
"""
def __init__(
self,
n_groups=None,
coordinates=None,
method="Heirachical",
max_distance=None,
n_splits=5,
test_size=0.15,
train_size=None,
shuffle=True,
random_state=None,
balance=True,
**kwargs
):
super().__init__(
n_groups=n_groups,
coordinates=coordinates,
method=method,
max_distance=max_distance,
n_splits=n_splits,
**kwargs
)
if n_splits < 2:
raise ValueError(
"Number of splits must be >=2 for clusterKFold. Given {}.".format(
n_splits
)
)
self.test_size = test_size
self.shuffle = shuffle
self.random_state = random_state
self.balance = balance
self.kwargs = kwargs
def _iter_test_indices(self, X=None, y=None, groups=None):
"""
Generates integer indices corresponding to test sets.
Parameters
----------
X : array-like, shape (n_samples, 2)
Columns should be the easting and northing coordinates of data
points, respectively.
y : array-like, shape (n_samples,)
The target variable for supervised learning problems. Always
ignored.
groups : array-like, with shape (n_samples,), optional
Group labels for the samples used while splitting the dataset into
train/test set. Always ignored.
Yields
------
test : ndarray
The testing set indices for that split.
"""
labels = spatial_clusters(
n_groups=self.n_groups,
coordinates=self.coordinates,
method=self.method,
max_distance=self.max_distance,
**self.kwargs
)
cluster_ids = np.unique(labels)
if self.n_splits > cluster_ids.size:
raise ValueError(
"Number of k-fold splits ({}) cannot be greater than the number of "
"clusters ({}). Either decrease n_splits or increase the number of "
"clusters.".format(self.n_splits, cluster_ids.size)
)
if self.shuffle:
check_random_state(self.random_state).shuffle(cluster_ids)
if self.balance:
cluster_sizes = [np.isin(labels, i).sum() for i in cluster_ids]
try:
split_points = _partition_by_sum(
cluster_sizes, parts=self.n_splits
)
folds = np.split(np.arange(cluster_ids.size), split_points)
except ValueError:
warnings.warn(
"Could not balance folds to have approximately the same "
"number of data points. Dividing into folds with equal "
"number of clusters instead. Decreasing n_splits or increasing "
"the number of clusters may help.",
UserWarning,
)
folds = [
i
for _, i in KFold(n_splits=self.n_splits).split(
cluster_ids
)
]
else:
folds = [
i for _, i in KFold(n_splits=self.n_splits).split(cluster_ids)
]
for test_clusters in folds:
test_points = np.where(
np.isin(labels, cluster_ids[test_clusters])
)[0]
yield test_points