# classification.py"""Machine learning classification tools for analysing remote sensing datausing the Open Data Cube.License: The code in this notebook is licensed under the Apache License,Version 2.0 (https://www.apache.org/licenses/LICENSE-2.0). Digital EarthAustralia data is licensed under the Creative Commons by Attribution 4.0license (https://creativecommons.org/licenses/by/4.0/).Contact: If you need assistance, please post a question on the Open DataCube Discord chat (https://discord.com/invite/4hhBQVas5U) or on the GIS StackExchange (https://gis.stackexchange.com/questions/ask?tags=open-data-cube)using the `open-data-cube` tag (you can view previously asked questionshere: https://gis.stackexchange.com/questions/tagged/open-data-cube).If you would like to report an issue with this script, you can file oneon GitHub (https://github.com/GeoscienceAustralia/dea-notebooks/issues/new).Last modified: May 2021"""importosimportsysimportjoblibimportnumpyasnpimportpandasaspdimportxarrayasxrimporttimeimportwarningsfromdatetimeimporttimedeltaimportgeopandasasgpdfromcopyimportdeepcopyfromtqdm.autoimporttqdmimportmultiprocessingasmpimportmatplotlib.pyplotaspltfromtypingimportCallable,Tuple,Any,Optional,List,Dictimportdask.arrayasdafromdask_ml.wrappersimportParallelPostFitimportdask.distributedasddfromdask.diagnosticsimportProgressBarfromsklearn.clusterimportKMeansfromsklearn.utilsimportcheck_random_statefromabcimportABCMeta,abstractmethodfromsklearn.baseimportClusterMixinfromsklearn.mixtureimportGaussianMixturefromsklearn.clusterimportAgglomerativeClusteringfromsklearn.model_selectionimportKFold,ShuffleSplitfromsklearn.model_selectionimportBaseCrossValidatorfromdatacube.utils.geometryimportassign_crsfromdatacube.utilsimportgeometryfromdea_tools.spatialimportxr_rasterize
[docs]defsklearn_flatten(input_xr):""" Reshape a DataArray or Dataset with spatial (and optionally temporal) structure into an np.array with the spatial and temporal dimensions flattened into one dimension. This flattening procedure enables DataArrays and Datasets to be used to train and predict with sklearn models. Last modified: September 2019 Parameters ---------- input_xr : xarray.DataArray or xarray.Dataset Must have dimensions 'x' and 'y', may have dimension 'time'. Dimensions other than 'x', 'y' and 'time' are unaffected by the flattening. Returns ---------- input_np : numpy.array A numpy array corresponding to input_xr.data (or input_xr.to_array().data), with dimensions 'x','y' and 'time' flattened into a single dimension, which is the first axis of the returned array. input_np contains no NaNs. """# cast input Datasets to DataArrayifisinstance(input_xr,xr.Dataset):input_xr=input_xr.to_array()# stack across pixel dimensions, handling timeseries if necessaryif"time"ininput_xr.dims:stacked=input_xr.stack(z=["x","y","time"])else:stacked=input_xr.stack(z=["x","y"])# finding 'bands' dimensions in each pixel - these will not be# flattened as their context is important for sklearnpxdims=[]fordiminstacked.dims:ifdim!="z":pxdims.append(dim)# mask NaNs - we mask pixels with NaNs in *any* band, because# sklearn cannot accept NaNs as inputmask=np.isnan(stacked)iflen(pxdims)!=0:mask=mask.any(dim=pxdims)# turn the mask into a numpy array (boolean indexing with xarrays# acts weird)mask=mask.data# the dimension we are masking along ('z') needs to be the first# dimension in the underlying np array for the boolean indexing to workstacked=stacked.transpose("z",*pxdims)input_np=stacked.data[~mask]returninput_np
[docs]defsklearn_unflatten(output_np,input_xr):""" Reshape a numpy array with no 'missing' elements (NaNs) and 'flattened' spatiotemporal structure into a DataArray matching the spatiotemporal structure of the DataArray This enables an sklearn model's prediction to be remapped to the correct pixels in the input DataArray or Dataset. Last modified: September 2019 Parameters ---------- output_np : numpy.array The first dimension's length should correspond to the number of valid (non-NaN) pixels in input_xr. input_xr : xarray.DataArray or xarray.Dataset Must have dimensions 'x' and 'y', may have dimension 'time'. Dimensions other than 'x', 'y' and 'time' are unaffected by the flattening. Returns ---------- output_xr : xarray.DataArray An xarray.DataArray with the same dimensions 'x', 'y' and 'time' as input_xr, and the same valid (non-NaN) pixels. These pixels are set to match the data in output_np. """# the output of a sklearn model prediction should just be a numpy array# with size matching x*y*time for the input DataArray/Dataset.# cast input Datasets to DataArrayifisinstance(input_xr,xr.Dataset):input_xr=input_xr.to_array()# generate the same mask we used to create the input to the sklearn modelif"time"ininput_xr.dims:stacked=input_xr.stack(z=["x","y","time"])else:stacked=input_xr.stack(z=["x","y"])pxdims=[]fordiminstacked.dims:ifdim!="z":pxdims.append(dim)mask=np.isnan(stacked)iflen(pxdims)!=0:mask=mask.any(dim=pxdims)# handle multivariable outputoutput_px_shape=()iflen(output_np.shape[1:]):output_px_shape=output_np.shape[1:]# use the mask to put the data in all the right placesoutput_ma=np.ma.empty((len(stacked.z),*output_px_shape))output_ma[~mask]=output_npoutput_ma[mask]=np.ma.masked# set the stacked coordinate to match the inputoutput_xr=xr.DataArray(output_ma,coords={"z":stacked["z"]},dims=["z",*["output_dim_"+str(idx)foridxinrange(len(output_px_shape))],],)output_xr=output_xr.unstack()returnoutput_xr
[docs]deffit_xr(model,input_xr):""" Utilise our wrappers to fit a vanilla sklearn model. Last modified: September 2019 Parameters ---------- model : scikit-learn model or compatible object Must have a fit() method that takes numpy arrays. input_xr : xarray.DataArray or xarray.Dataset. Must have dimensions 'x' and 'y', may have dimension 'time'. Returns ---------- model : a scikit-learn model which has been fitted to the data in the pixels of input_xr. """model=model.fit(sklearn_flatten(input_xr))returnmodel
[docs]defpredict_xr(model,input_xr,chunk_size=None,persist=False,proba=False,max_proba=True,clean=False,return_input=False,):""" Using dask-ml ParallelPostfit(), runs the parallel predict and predict_proba methods of sklearn estimators. Useful for running predictions on a larger-than-RAM datasets. Last modified: September 2020 Parameters ---------- model : scikit-learn model or compatible object Must have a .predict() method that takes numpy arrays. input_xr : xarray.DataArray or xarray.Dataset. Must have dimensions 'x' and 'y' chunk_size : int The dask chunk size to use on the flattened array. If this is left as None, then the chunks size is inferred from the .chunks method on the `input_xr` persist : bool If True, and proba=True, then 'input_xr' data will be loaded into distributed memory. This will ensure data is not loaded twice for the prediction of probabilities, but this will only work if the data is not larger than distributed RAM. proba : bool If True, predict probabilities max_proba : bool If True, the probabilities array will be flattened to contain only the probabiltiy for the "Predictions" class. If False, the "Probabilities" object will be an array of prediction probaiblities for each classes clean : bool If True, remove Infs and NaNs from input and output arrays return_input : bool If True, then the data variables in the 'input_xr' dataset will be appended to the output xarray dataset. Returns ---------- output_xr : xarray.Dataset An xarray.Dataset containing the prediction output from model. if proba=True then dataset will also contain probabilites, and if return_input=True then dataset will have the input feature layers. Has the same spatiotemporal structure as input_xr. """# if input_xr isn't dask, coerce itdask=Trueifnotbool(input_xr.chunks):dask=Falseinput_xr=input_xr.chunk({"x":len(input_xr.x),"y":len(input_xr.y)})# set chunk size if not suppliedifchunk_sizeisNone:chunk_size=int(input_xr.chunks["x"][0])*int(input_xr.chunks["y"][0])def_predict_func(model,input_xr,persist,proba,max_proba,clean,return_input):x,y,crs=input_xr.x,input_xr.y,input_xr.geobox.crsinput_data=[]forvar_nameininput_xr.data_vars:input_data.append(input_xr[var_name])input_data_flattened=[]forarrininput_data:data=arr.data.flatten().rechunk(chunk_size)input_data_flattened.append(data)# reshape for predictioninput_data_flattened=da.array(input_data_flattened).transpose()ifclean==True:input_data_flattened=da.where(da.isfinite(input_data_flattened),input_data_flattened,0)if(proba==True)&(persist==True):# persisting data so we don't require loading all the data twiceinput_data_flattened=input_data_flattened.persist()# apply the classificationprint("predicting...")out_class=model.predict(input_data_flattened)# Mask out NaN or Inf values in resultsifclean==True:out_class=da.where(da.isfinite(out_class),out_class,0)# Reshape when writing outout_class=out_class.reshape(len(y),len(x))# stack back into xarrayoutput_xr=xr.DataArray(out_class,coords={"x":x,"y":y},dims=["y","x"])output_xr=output_xr.to_dataset(name="Predictions")ifproba==True:print(" probabilities...")out_proba=model.predict_proba(input_data_flattened)# return either one band with the max probability, or the whole probability arrayifmax_proba==True:print(" returning single probability band.")out_proba=da.max(out_proba,axis=1)*100.0out_proba=out_proba.reshape(len(y),len(x))out_proba=xr.DataArray(out_proba,coords={"x":x,"y":y},dims=["y","x"])output_xr["Probabilities"]=out_probaelse:print(" returning class probability array.")out_proba=out_proba*100.0class_names=model.classes_# Get the unique class names from the fitted classifier# Loop through each class (band)probabilities_dataset=xr.Dataset()fori,class_nameinenumerate(class_names):reshaped_band=out_proba[:,i].reshape(len(y),len(x))reshaped_da=xr.DataArray(reshaped_band,coords={"x":x,"y":y},dims=["y","x"])probabilities_dataset[f"prob_{class_name}"]=reshaped_da# merge in the probabilitiesoutput_xr=xr.merge([output_xr,probabilities_dataset])ifclean==True:out_proba=da.where(da.isfinite(out_proba),out_proba,0)ifreturn_input==True:print(" input features...")# unflatten the input_data_flattened array and append# to the output_xr containin the predictionsarr=input_xr.to_array()stacked=arr.stack(z=["y","x"])# handle multivariable outputoutput_px_shape=()iflen(input_data_flattened.shape[1:]):output_px_shape=input_data_flattened.shape[1:]output_features=input_data_flattened.reshape((len(stacked.z),*output_px_shape))# set the stacked coordinate to match the inputoutput_features=xr.DataArray(output_features,coords={"z":stacked["z"]},dims=["z",*["output_dim_"+str(idx)foridxinrange(len(output_px_shape))],],).unstack()# convert to dataset and rename arraysoutput_features=output_features.to_dataset(dim="output_dim_0")data_vars=list(input_xr.data_vars)output_features=output_features.rename({i:jfori,jinzip(output_features.data_vars,data_vars)})# merge with predictionsoutput_xr=xr.merge([output_xr,output_features],compat="override")returnassign_crs(output_xr,str(crs))ifdask==True:# convert model to dask predictmodel=ParallelPostFit(model)withjoblib.parallel_backend("dask"):output_xr=_predict_func(model,input_xr,persist,proba,max_proba,clean,return_input)else:output_xr=_predict_func(model,input_xr,persist,proba,max_proba,clean,return_input).compute()returnoutput_xr
[docs]classHiddenPrints:""" For concealing unwanted print statements called by other functions """def__enter__(self):self._original_stdout=sys.stdoutsys.stdout=open(os.devnull,"w")def__exit__(self,exc_type,exc_val,exc_tb):sys.stdout.close()sys.stdout=self._original_stdout
def_get_training_data_for_shp(gdf:gpd.GeoDataFrame,index:int,row:gpd.GeoSeries,out_arrs:List[np.ndarray],out_vars:List[List[str]],dc_query:Dict,return_coords:bool,feature_func:Optional[callable]=None,field:Optional[str]=None,zonal_stats:Optional[str]=None,time_field:Optional[str]=None,time_delta:Optional[timedelta]=None,):""" This is the core function that is triggered by `collect_training_data`. The `collect_training_data` function loops through geometries in a geopandas geodataframe and runs the code within `_get_training_data_for_shp`. Parameters are inherited from `collect_training_data`. See that function for information on the other params not listed below. Parameters ---------- gdf : gpd.GeoDataFrame Geopandas GeoDataFrame containing geometries. index : int Index of the current geometry in the GeoDataFrame. row : gpd.GeoSeries GeoSeries representing the current row in the GeoDataFrame. out_arrs : List[np.ndarray] An empty list into which the training data arrays are stored. out_vars : List[List[str]] An empty list into which the data variable names are stored. dc_query : Dict ODC query. return_coords : bool Flag indicating whether to return coordinates in the dataset. feature_func : callable, optional Optional function to extract data based on `dc_query`. Defaults to None. field : str, optional Name of the class field. Defaults to None. zonal_stats : str, optional Zonal statistics method. Defaults to None. time_field : str, optional Name of the column containing timestamp data in the input gdf. Defaults to None. time_delta : timedelta, optional Time delta used to match a data point with all the scenes falling between `time_stamp - time_delta` and `time_stamp + time_delta`. Defaults to None. Returns -------- Two lists, a list of numpy.arrays containing classes and extracted data for each pixel or polygon, and another containing the data variable names. """# prevent function altering dictionary kwargsdc_query=deepcopy(dc_query)# remove dask chunks if supplied as using# mulitprocessing for parallizationif"dask_chunks"indc_query.keys():dc_query.pop("dask_chunks",None)# set up query based on polygongeom=geometry.Geometry(geom=gdf.iloc[index].geometry,crs=gdf.crs)q={"geopolygon":geom}# merge polygon query with user supplied query paramsdc_query.update(q)# Update time range if a time window is specifiediftime_deltaisnotNone:timestamp=gdf.loc[index][time_field]start_time=timestamp-time_deltaend_time=timestamp+time_deltatimestamp={"time":(start_time,end_time)}# merge time query with user supplied query paramsdc_query.update(timestamp)# Use input feature functiondata=feature_func(dc_query)# if no data is present then returniflen(data)==0:returnifgdf.iloc[[index]].geometry.geom_type.values!="Point":# If the geometry type is a polygon extract all pixels# create polygon maskmask=xr_rasterize(gdf.iloc[[index]],data)data=data.where(mask)# Check that feature_func has removed timeif"time"indata.dims:t=data.dims["time"]ift>1andtime_deltaisnotNone:raiseValueError("After running the feature_func, the dataset still has "+str(t)+" time-steps, dataset must only have"+" x and y dimensions.")ifreturn_coords==True:# turn coords into a variable in the dsdata["x_coord"]=data.x+0*data.ydata["y_coord"]=data.y+0*data.x# append ID measurement to dataset for tracking failuresband=[mformindata.data_vars][0]_id=xr.zeros_like(data[band])data["id"]=_iddata["id"]=data["id"]+gdf.iloc[index]["id"]# If no zonal stats were requested then extract all pixel valuesifzonal_statsisNone:flat_train=sklearn_flatten(data)flat_val=np.repeat(row[field],flat_train.shape[0])stacked=np.hstack((np.expand_dims(flat_val,axis=1),flat_train))elifzonal_statsin["mean","median","max","min"]:method_to_call=getattr(data,zonal_stats)flat_train=method_to_call()flat_train=flat_train.to_array()stacked=np.hstack((row[field],flat_train))else:raiseException(zonal_stats+" is not one of the supported"+" reduce functions ('mean','median','max','min')")out_arrs.append(stacked)out_vars.append([field]+list(data.data_vars))def_get_training_data_parallel(gdf:gpd.GeoDataFrame,dc_query:str,ncpus:int,return_coords:bool,feature_func:Optional[Callable]=None,field:Optional[str]=None,zonal_stats:Optional[str]=None,time_field:Optional[str]=None,time_delta:Optional[int]=None,)->Tuple[List[str],List[Any]]:""" Function passing the '_get_training_data_for_shp' function to a mulitprocessing.Pool. Inherits variables from 'collect_training_data()'. """# Check if dask-client is runningtry:zx=Nonezx=dd.get_client()except:passifzxisnotNone:raiseValueError("You have a Dask Client running, which prevents \n""this function from multiprocessing. Close the client.")# instantiate lists that can be shared across processesmanager=mp.Manager()results=manager.list()column_names=manager.list()# progress barpbar=tqdm(total=len(gdf))defupdate(*a):pbar.update()withmp.Pool(ncpus)aspool:forindex,rowingdf.iterrows():pool.apply_async(_get_training_data_for_shp,[gdf,index,row,results,column_names,dc_query,return_coords,feature_func,field,zonal_stats,time_field,time_delta,],callback=update,)pool.close()pool.join()pbar.close()returncolumn_names,results
[docs]defcollect_training_data(gdf:gpd.GeoDataFrame,dc_query:dict,ncpus:int=1,return_coords:bool=False,feature_func:callable=None,field:str=None,zonal_stats:str=None,clean:bool=True,fail_threshold:float=0.02,fail_ratio:float=0.5,max_retries:int=3,time_field:str=None,time_delta:timedelta=None,)->Tuple[List[np.ndarray],List[str]]:""" This function provides methods for gathering training data from the ODC over geometries stored within a geopandas geodataframe. The function will return a 'model_input' array containing stacked training data arrays with all NaNs & Infs removed. In the instance where ncpus > 1, a parallel version of the function will be run (functions are passed to a mp.Pool()). This function can conduct zonal statistics if the supplied shapefile contains polygons. The 'feature_func' parameter defines what features to produce. Parameters ---------- gdf : geopandas geodataframe geometry data in the form of a geopandas geodataframe dc_query : dictionary Datacube query object, should not contain lat and long (x or y) variables as these are supplied by the 'gdf' variable ncpus : int The number of cpus/processes over which to parallelize the gathering of training data (only if ncpus is > 1). Use 'mp.cpu_count()' to determine the number of cpus available on a machine. Defaults to 1. return_coords : bool If True, then the training data will contain two extra columns 'x_coord' and 'y_coord' corresponding to the x,y coordinate of each sample. This variable can be useful for handling spatial autocorrelation between samples later in the ML workflow. feature_func : function A function for generating feature layers that is applied to the data within the bounds of the input geometry. The 'feature_func' must accept a 'dc_query' object, and return a single xarray.Dataset or xarray.DataArray containing 2D coordinates (i.e x, y - no time dimension). e.g. def feature_function(query): dc = datacube.Datacube(app='feature_layers') ds = dc.load(**query) ds = ds.mean('time') return ds field : str Name of the column in the gdf that contains the class labels zonal_stats : string, optional An optional string giving the names of zonal statistics to calculate for each polygon. Default is None (all pixel values are returned). Supported values are 'mean', 'median', 'max', 'min'. clean : bool Whether or not to remove missing values in the training dataset. If True, training labels with any NaNs or Infs in the feature layers will be dropped from the dataset. fail_threshold : float, default 0.02 Silent read fails on S3 can result in some rows of the returned data containing NaN values. The'fail_threshold' fraction specifies a % of acceptable fails. e.g. Setting 'fail_threshold' to 0.05 means if >5% of the samples in the training dataset fail then those samples will be reutnred to the multiprocessing queue. Below this fraction the function will accept the failures and return the results. fail_ratio: float A float between 0 and 1 that defines if a given training sample has failed. Default is 0.5, which means if 50 % of the measurements in a given sample return null values, and the number of total fails is more than the fail_threshold, the samplewill be passed to the retry queue. max_retries: int, default 3 Maximum number of times to retry collecting samples. This number is invoked if the 'fail_threshold' is not reached. time_field: str The name of the attribute in the input dataframe containing capture timestamp time_delta: time_delta The size of the window used as timestamp +/- time_delta. This is used to allow matching a single field data point with multiple scenes Returns -------- Two lists, a list of numpy.arrays containing classes and extracted data for each pixel or polygon, and another containing the data variable names. """# check the dtype of the class fieldifgdf[field].dtype!=int:raiseValueError('The "field" column of the input vector must contain integer dtypes')# check for feature_funciffeature_funcisNone:raiseValueError("Please supply a feature layer function through the "+"parameter 'feature_func'")ifzonal_statsisnotNone:print("Taking zonal statistic: "+zonal_stats)# add unique id to gdf to help with indexing failed rows# during multiprocessing# if zonal_stats is not None:gdf["id"]=range(0,len(gdf))ifncpus==1:# progress indicatorprint("Collecting training data in serial mode")i=0# list to store resultsresults=[]column_names=[]# loop through polys and extract training dataforindex,rowingdf.iterrows():print(" Feature {:04}/{:04}\r".format(i+1,len(gdf)),end="")_get_training_data_for_shp(gdf,index,row,results,column_names,dc_query,return_coords,feature_func,field,zonal_stats,time_field,time_delta,)i+=1else:print("Collecting training data in parallel mode")column_names,results=_get_training_data_parallel(gdf=gdf,dc_query=dc_query,ncpus=ncpus,return_coords=return_coords,feature_func=feature_func,field=field,zonal_stats=zonal_stats,time_field=time_field,time_delta=time_delta,)# column names are appended during each iteration# but they are identical, grab only the first instancecolumn_names=column_names[0]# Stack the extracted training data for each feature into a single arraymodel_input=np.vstack(results)# this code block below iteratively retries failed rows# up to max_retries or until fail_threshold is# reached - whichever occurs firstifncpus>1:i=1whilei<=max_retries:# Find % of fails (null values) in data. Use Pandas for simplicitydf=pd.DataFrame(data=model_input[:,0:-1],index=model_input[:,-1])# how many nan values per id?num_nans=df.isnull().sum(axis=1)num_nans=num_nans.groupby(num_nans.index).sum()# how many valid values per id?num_valid=df.notnull().sum(axis=1)num_valid=num_valid.groupby(num_valid.index).sum()# find fail rateperc_fail=num_nans/(num_nans+num_valid)fail_ids=perc_fail[perc_fail>fail_ratio]fail_rate=len(fail_ids)/len(gdf)print("Percentage of possible fails after run "+str(i)+" = "+str(round(fail_rate*100,2))+" %")iffail_rate>fail_threshold:print("Recollecting samples that failed")fail_ids=list(fail_ids.index)# keep only the ids in model_input object that didn't failmodel_input=model_input[~np.isin(model_input[:,-1],fail_ids)]# index out the fail_ids from the original gdfgdf_rerun=gdf.loc[gdf["id"].isin(fail_ids)]gdf_rerun=gdf_rerun.reset_index(drop=True)time.sleep(5)# sleep for 5s to rest api# recollect failed rows(column_names_again,results_again,)=_get_training_data_parallel(gdf=gdf_rerun,dc_query=dc_query,ncpus=ncpus,return_coords=return_coords,feature_func=feature_func,field=field,zonal_stats=zonal_stats,time_field=time_field,time_delta=time_delta,)# Stack the extracted training data for each feature into a single arraymodel_input_again=np.vstack(results_again)# merge results of the re-run with original runmodel_input=np.vstack((model_input,model_input_again))i+=1else:break# -----------------------------------------------# remove id columnidx_var=column_names[0:-1]model_col_indices=[column_names.index(var_name)forvar_nameinidx_var]model_input=model_input[:,model_col_indices]ifclean==True:num=np.count_nonzero(np.isnan(model_input).any(axis=1))model_input=model_input[~np.isnan(model_input).any(axis=1)]model_input=model_input[~np.isinf(model_input).any(axis=1)]print("Removed "+str(num)+" rows wth NaNs &/or Infs")print("Output shape: ",model_input.shape)else:print("Returning data without cleaning")print("Output shape: ",model_input.shape)returncolumn_names[0:-1],model_input
[docs]classKMeans_tree(ClusterMixin):""" A hierarchical KMeans unsupervised clustering model. This class is a clustering model, so it inherits scikit-learn's ClusterMixin base class. Parameters ---------- n_levels : integer, default 2 number of levels in the tree of clustering models. n_clusters : integer, default 3 Number of clusters in each of the constituent KMeans models in the tree. **kwargs : optional Other keyword arguments to be passed directly to the KMeans initialiser. """def__init__(self,n_levels=2,n_clusters=3,**kwargs):assertn_levels>=1self.base_model=KMeans(n_clusters=3,**kwargs)self.n_levels=n_levelsself.n_clusters=n_clusters# make child modelsifn_levels>1:self.branches=[KMeans_tree(n_levels=n_levels-1,n_clusters=n_clusters,**kwargs)for_inrange(n_clusters)]
[docs]deffit(self,X,y=None,sample_weight=None):""" Fit the tree of KMeans models. All parameters mimic those of KMeans.fit(). Parameters ---------- X : array-like or sparse matrix, shape=(n_samples, n_features) Training instances to cluster. It must be noted that the data will be converted to C ordering, which will cause a memory copy if the given data is not C-contiguous. y : Ignored not used, present here for API consistency by convention. sample_weight : array-like, shape (n_samples,), optional The weights for each observation in X. If None, all observations are assigned equal weight (default: None) """self.labels_=self.base_model.fit(X,sample_weight=sample_weight).labels_ifself.n_levels>1:labels_old=np.copy(self.labels_)# make room to add the sub-cluster labelsself.labels_*=(self.n_clusters)**(self.n_levels-1)forcluinrange(self.n_clusters):# fit child models on their corresponding partition of the training setself.branches[clu].fit(X[labels_old==clu],sample_weight=(sample_weight[labels_old==clu]ifsample_weightisnotNoneelseNone),)self.labels_[labels_old==clu]+=self.branches[clu].labels_returnself
[docs]defpredict(self,X,sample_weight=None):""" Send X through the KMeans tree and predict the resultant cluster. Compatible with KMeans.predict(). Parameters ---------- X : {array-like, sparse matrix}, shape = [n_samples, n_features] New data to predict. sample_weight : array-like, shape (n_samples,), optional The weights for each observation in X. If None, all observations are assigned equal weight (default: None) Returns ------- labels : array, shape [n_samples,] Index of the cluster each sample belongs to. """result=self.base_model.predict(X,sample_weight=sample_weight)ifself.n_levels>1:rescpy=np.copy(result)# make room to add the sub-cluster labelsresult*=(self.n_clusters)**(self.n_levels-1)forcluinrange(self.n_clusters):result[rescpy==clu]+=self.branches[clu].predict(X[rescpy==clu],sample_weight=(sample_weight[rescpy==clu]ifsample_weightisnotNoneelseNone),)returnresult
[docs]defspatial_clusters(coordinates,method="Hierarchical",max_distance=None,n_groups=None,verbose=False,**kwargs):""" Create spatial groups on coorindate data using either KMeans clustering or a Gaussian Mixture model Last modified: September 2020 Parameters ---------- n_groups : int The number of groups to create. This is passed as 'n_clusters=n_groups' for the KMeans algo, and 'n_components=n_groups' for the GMM. If using method='Hierarchical' then this paramter is ignored. coordinates : np.array A numpy array of coordinate values e.g. np.array([[3337270., 262400.], [3441390., -273060.], ...]) method : str Which algorithm to use to seperate data points. Either 'KMeans', 'GMM', or 'Hierarchical'. If using 'Hierarchical' then must set max_distance. max_distance : int If method is set to 'hierarchical' then maximum distance describes the maximum euclidean distances between all observations in a cluster. 'n_groups' is ignored in this case. **kwargs : optional, Additional keyword arguments to pass to sklearn.cluster.Kmeans or sklearn.mixture.GuassianMixture depending on the 'method' argument. Returns ------- labels : array, shape [n_samples,] Index of the cluster each sample belongs to. """ifmethodnotin["Hierarchical","KMeans","GMM"]:raiseValueError("Method must be one of: 'Hierarchical','KMeans' or 'GMM'")if(methodin["GMM","KMeans"])&(n_groupsisNone):raiseValueError("The 'GMM' and 'KMeans' methods requires explicitly setting 'n_groups'")if(method=="Hierarchical")&(max_distanceisNone):raiseValueError("The 'Hierarchical' method requires setting max_distance")ifmethod=="Hierarchical":cluster_label=AgglomerativeClustering(n_clusters=None,linkage="complete",distance_threshold=max_distance,**kwargs).fit_predict(coordinates)ifmethod=="KMeans":cluster_label=KMeans(n_clusters=n_groups,**kwargs).fit_predict(coordinates)ifmethod=="GMM":cluster_label=GaussianMixture(n_components=n_groups,**kwargs).fit_predict(coordinates)ifverbose:print("n clusters = "+str(len(np.unique(cluster_label))))returncluster_label
[docs]defSKCV(coordinates,n_splits,cluster_method,kfold_method,test_size,balance,n_groups=None,max_distance=None,train_size=None,random_state=None,**kwargs):""" Generate spatial k-fold cross validation indices using coordinate data. This function wraps the 'SpatialShuffleSplit' and 'SpatialKFold' classes. These classes ingest coordinate data in the form of an np.array([[Eastings, northings]]) and assign samples to a spatial cluster using either a KMeans, Gaussain Mixture, or Agglomerative Clustering algorithm. This cross-validator is preferred over other sklearn.model_selection methods for spatial data to avoid overestimating cross-validation scores. This can happen because of the inherent spatial autocorrelation that is usually associated with this type of data. Last modified: Dec 2020 Parameters ---------- coordinates : np.array A numpy array of coordinate values e.g. np.array([[3337270., 262400.], [3441390., -273060.], ...]) n_splits : int The number of test-train cross validation splits to generate. cluster_method : str Which algorithm to use to seperate data points. Either 'KMeans', 'GMM', or 'Hierarchical' kfold_method : str One of either 'SpatialShuffleSplit' or 'SpatialKFold'. See the docs under class:_SpatialShuffleSplit and class: _SpatialKFold for more information on these options. test_size : float, int, None If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the test split. If int, represents the absolute number of test samples. If None, the value is set to the complement of the train size. If ``train_size`` is also None, it will be set to 0.15. balance : int or bool if setting kfold_method to 'SpatialShuffleSplit': int The number of splits generated per iteration to try to balance the amount of data in each set so that *test_size* and *train_size* are respected. If 1, then no extra splits are generated (essentially disabling the balacing). Must be >= 1. if setting kfold_method to 'SpatialKFold': bool Whether or not to split clusters into fold with approximately equal number of data points. If False, each fold will have the same number of clusters (which can have different number of data points in them). n_groups : int The number of groups to create. This is passed as 'n_clusters=n_groups' for the KMeans algo, and 'n_components=n_groups' for the GMM. If using cluster_method='Hierarchical' then this parameter is ignored. max_distance : int If method is set to 'hierarchical' then maximum distance describes the maximum euclidean distances between all observations in a cluster. 'n_groups' is ignored in this case. train_size : float, int, or None If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the train split. If int, represents the absolute number of train samples. If None, the value is automatically set to the complement of the test size. random_state : int, RandomState instance or None, optional (default=None) If int, random_state is the seed used by the random number generator; If RandomState instance, random_state is the random number generator; If None, the random number generator is the RandomState instance used by `np.random`. **kwargs : optional, Additional keyword arguments to pass to sklearn.cluster.Kmeans or sklearn.mixture.GuassianMixture depending on the cluster_method argument. Returns -------- generator object _BaseSpatialCrossValidator.split """# intiate a methodifkfold_method=="SpatialShuffleSplit":splitter=_SpatialShuffleSplit(n_groups=n_groups,method=cluster_method,coordinates=coordinates,max_distance=max_distance,test_size=test_size,train_size=train_size,n_splits=n_splits,random_state=random_state,balance=balance,**kwargs)ifkfold_method=="SpatialKFold":splitter=_SpatialKFold(n_groups=n_groups,coordinates=coordinates,max_distance=max_distance,method=cluster_method,test_size=test_size,n_splits=n_splits,random_state=random_state,balance=balance,**kwargs)returnsplitter
[docs]defspatial_train_test_split(X,y,coordinates,cluster_method,kfold_method,balance,test_size=None,n_splits=None,n_groups=None,max_distance=None,train_size=None,random_state=None,**kwargs):""" Split arrays into random train and test subsets. Similar to `sklearn.model_selection.train_test_split` but instead works on spatial coordinate data. Coordinate data is grouped according to either a KMeans, Gaussain Mixture, or Agglomerative Clustering algorthim. Grouping by spatial clusters is preferred over plain random splits for spatial data to avoid overestimating validation scores due to spatial autocorrelation. Parameters ---------- X : np.array Training data features y : np.array Training data labels coordinates : np.array A numpy array of coordinate values e.g. np.array([[3337270., 262400.], [3441390., -273060.], ...]) cluster_method : str Which algorithm to use to seperate data points. Either 'KMeans', 'GMM', or 'Hierarchical' kfold_method : str One of either 'SpatialShuffleSplit' or 'SpatialKFold'. See the docs under class:_SpatialShuffleSplit and class: _SpatialKFold for more information on these options. balance : int or bool if setting kfold_method to 'SpatialShuffleSplit': int The number of splits generated per iteration to try to balance the amount of data in each set so that *test_size* and *train_size* are respected. If 1, then no extra splits are generated (essentially disabling the balacing). Must be >= 1. if setting kfold_method to 'SpatialKFold': bool Whether or not to split clusters into fold with approximately equal number of data points. If False, each fold will have the same number of clusters (which can have different number of data points in them). test_size : float, int, None If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the test split. If int, represents the absolute number of test samples. If None, the value is set to the complement of the train size. If ``train_size`` is also None, it will be set to 0.15. n_splits : int This parameter is invoked for the 'SpatialKFold' folding method, use this number to satisfy the train-test size ratio desired, as the 'test_size' parameter for the KFold method often fails to get the ratio right. n_groups : int The number of groups to create. This is passed as 'n_clusters=n_groups' for the KMeans algo, and 'n_components=n_groups' for the GMM. If using cluster_method='Hierarchical' then this parameter is ignored. max_distance : int If method is set to 'hierarchical' then maximum distance describes the maximum euclidean distances between all observations in a cluster. 'n_groups' is ignored in this case. train_size : float, int, or None If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the train split. If int, represents the absolute number of train samples. If None, the value is automatically set to the complement of the test size. random_state : int, RandomState instance or None, optional If int, random_state is the seed used by the random number generator; If RandomState instance, random_state is the random number generator; If None, the random number generator is the RandomState instance used by `np.random`. **kwargs : optional, Additional keyword arguments to pass to sklearn.cluster.Kmeans or sklearn.mixture.GuassianMixture depending on the cluster_method argument. Returns ------- Tuple : Contains four arrays in the following order: X_train, X_test, y_train, y_test """ifkfold_method=="SpatialShuffleSplit":splitter=_SpatialShuffleSplit(n_groups=n_groups,method=cluster_method,coordinates=coordinates,max_distance=max_distance,test_size=test_size,train_size=train_size,n_splits=1ifn_splitsisNoneelsen_splits,random_state=random_state,balance=balance,**kwargs)ifkfold_method=="SpatialKFold":ifn_splitsisNone:raiseValueError("n_splits parameter requires an integer value, eg. 'n_splits=5'")if(test_sizeisnotNone)or(train_sizeisnotNone):warnings.warn("With the 'SpatialKFold' method, controlling the test/train ratio ""is better achieved using the 'n_splits' parameter")splitter=_SpatialKFold(n_groups=n_groups,coordinates=coordinates,max_distance=max_distance,method=cluster_method,n_splits=n_splits,random_state=random_state,balance=balance,**kwargs)lst=[]fortrain,testinsplitter.split(coordinates):X_tr,X_tt=X[train,:],X[test,:]y_tr,y_tt=y[train],y[test]lst.extend([X_tr,X_tt,y_tr,y_tt])return(lst[0],lst[1],lst[2],lst[3])
def_partition_by_sum(array,parts):""" Partition an array into parts of approximately equal sum. Does not change the order of the array elements. Produces the partition indices on the array. Use :func:`numpy.split` to divide the array along these indices. Parameters ---------- array : array or array-like The 1D array that will be partitioned. The array will be raveled before computations. parts : int Number of parts to split the array. Can be at most the number of elements in the array. Returns ------- indices : array The indices in which the array should be split. Notes ----- Solution from https://stackoverflow.com/a/54024280 """array=np.atleast_1d(array).ravel()ifparts>array.size:raiseValueError("Cannot partition an array of size {} into {} parts of equal sum.".format(array.size,parts))cumulative_sum=array.cumsum()# Ideally, we want each part to have the same number of points (total /# parts).ideal_sum=cumulative_sum[-1]//parts# If the parts are ideal, the cumulative sum of each part will be thisideal_cumsum=np.arange(1,parts)*ideal_sumindices=np.searchsorted(cumulative_sum,ideal_cumsum,side="right")# Check for repeated split points, which indicates that there is no way to# split the array.ifnp.unique(indices).size!=indices.size:raiseValueError("Could not find partition points to split the array into {} parts ""of equal sum.".format(parts))returnindicesclass_BaseSpatialCrossValidator(BaseCrossValidator,metaclass=ABCMeta):""" Base class for spatial cross-validators. Parameters ---------- n_groups : int The number of groups to create. This is passed as 'n_clusters=n_groups' for the KMeans algo, and 'n_components=n_groups' for the GMM. coordinates : np.array A numpy array of coordinate values e.g. np.array([[3337270., 262400.], [3441390., -273060.], ..., method : str Which algorithm to use to seperate data points. Either 'KMeans' or 'GMM' n_splits : int Number of splitting iterations. """def__init__(self,n_groups=None,coordinates=None,method=None,max_distance=None,n_splits=None,):self.n_groups=n_groupsself.coordinates=coordinatesself.method=methodself.max_distance=max_distanceself.n_splits=n_splitsdefsplit(self,X,y=None,groups=None):""" Generate indices to split data into training and test set. Parameters ---------- X : array-like, shape (n_samples, 2) Columns should be the easting and northing coordinates of data points, respectively. y : array-like, shape (n_samples,) The target variable for supervised learning problems. Always ignored. groups : array-like, with shape (n_samples,), optional Group labels for the samples used while splitting the dataset into train/test set. Always ignored. Yields ------ train : ndarray The training set indices for that split. test : ndarray The testing set indices for that split. """ifX.shape[1]!=2:raiseValueError("X (the coordinate data) must have exactly 2 columns ({} given).".format(X.shape[1]))fortrain,testinsuper().split(X,y,groups):yieldtrain,testdefget_n_splits(self,X=None,y=None,groups=None):""" Returns the number of splitting iterations in the cross-validator Parameters ---------- X : object Always ignored, exists for compatibility. y : object Always ignored, exists for compatibility. groups : object Always ignored, exists for compatibility. Returns ------- n_splits : int Returns the number of splitting iterations in the cross-validator. """returnself.n_splits@abstractmethoddef_iter_test_indices(self,X=None,y=None,groups=None):""" Generates integer indices corresponding to test sets. MUST BE IMPLEMENTED BY DERIVED CLASSES. Parameters ---------- X : array-like, shape (n_samples, 2) Columns should be the easting and northing coordinates of data points, respectively. y : array-like, shape (n_samples,) The target variable for supervised learning problems. Always ignored. groups : array-like, with shape (n_samples,), optional Group labels for the samples used while splitting the dataset into train/test set. Always ignored. Yields ------ test : ndarray The testing set indices for that split. """class_SpatialShuffleSplit(_BaseSpatialCrossValidator):""" Random permutation of spatial cross-validator. Yields indices to split data into training and test sets. Data are first grouped into clusters using either a KMeans or GMM algorithm and are then split into testing and training sets randomly. The proportion of clusters assigned to each set is controlled by *test_size* and/or *train_size*. However, the total amount of actual data points in each set could be different from these values since clusters can have a different number of data points inside them. To guarantee that the proportion of actual data is as close as possible to the proportion of clusters, this cross-validator generates an extra number of splits and selects the one with proportion of data points in each set closer to the desired amount. The number of balance splits per iteration is controlled by the *balance* argument. This cross-validator is preferred over `sklearn.model_selection.ShuffleSplit` for spatial data to avoid overestimating cross-validation scores. This can happen because of the inherent spatial autocorrelation. Parameters ---------- n_groups : int The number of groups to create. This is passed as 'n_clusters=n_groups' for the KMeans algo, and 'n_components=n_groups' for the GMM. If using cluster_method='Hierarchical' then this parameter is ignored. coordinates : np.array A numpy array of coordinate values e.g. np.array([[3337270., 262400.], [3441390., -273060.], ...]) cluster_method : str Which algorithm to use to seperate data points. Either 'KMeans', 'GMM', or 'Hierarchical' max_distance : int If method is set to 'hierarchical' then maximum distance describes the maximum euclidean distances between all observations in a cluster. 'n_groups' is ignored in this case. n_splits : int, Number of re-shuffling & splitting iterations. test_size : float, int, None If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the test split. If int, represents the absolute number of test samples. If None, the value is set to the complement of the train size. If ``train_size`` is also None, it will be set to 0.1. train_size : float, int, or None If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the train split. If int, represents the absolute number of train samples. If None, the value is automatically set to the complement of the test size. random_state : int, RandomState instance or None, optional (default=None) If int, random_state is the seed used by the random number generator; If RandomState instance, random_state is the random number generator; If None, the random number generator is the RandomState instance used by `np.random`. balance : int The number of splits generated per iteration to try to balance the amount of data in each set so that *test_size* and *train_size* are respected. If 1, then no extra splits are generated (essentially disabling the balacing). Must be >= 1. **kwargs : optional, Additional keyword arguments to pass to sklearn.cluster.Kmeans or sklearn.mixture.GuassianMixture depending on the cluster_method argument. Returns -------- generator containing indices to split data into training and test sets """def__init__(self,n_groups=None,coordinates=None,method="Heirachical",max_distance=None,n_splits=None,test_size=0.15,train_size=None,random_state=None,balance=10,**kwargs):super().__init__(n_groups=n_groups,coordinates=coordinates,method=method,max_distance=max_distance,n_splits=n_splits,**kwargs)ifbalance<1:raiseValueError("The *balance* argument must be >= 1. To disable balance, use 1.")self.test_size=test_sizeself.train_size=train_sizeself.random_state=random_stateself.balance=balanceself.kwargs=kwargsdef_iter_test_indices(self,X=None,y=None,groups=None):""" Generates integer indices corresponding to test sets. Runs several iterations until a split is found that yields clusters with the right amount of data points in it. Parameters ---------- X : array-like, shape (n_samples, 2) Columns should be the easting and northing coordinates of data points, respectively. y : array-like, shape (n_samples,) The target variable for supervised learning problems. Always ignored. groups : array-like, with shape (n_samples,), optional Group labels for the samples used while splitting the dataset into train/test set. Always ignored. Yields ------ test : ndarray The testing set indices for that split. """labels=spatial_clusters(n_groups=self.n_groups,coordinates=self.coordinates,method=self.method,max_distance=self.max_distance,**self.kwargs)cluster_ids=np.unique(labels)# Generate many more splits so that we can pick and choose the ones# that have the right balance of training and testing data.shuffle=ShuffleSplit(n_splits=self.n_splits*self.balance,test_size=self.test_size,train_size=self.train_size,random_state=self.random_state,).split(cluster_ids)for_inrange(self.n_splits):test_sets,balance=[],[]for_inrange(self.balance):# This is a false positive in pylint which is why the warning# is disabled at the top of this file:# https://github.com/PyCQA/pylint/issues/1830# pylint: disable=stop-iteration-returntrain_clusters,test_clusters=next(shuffle)# pylint: enable=stop-iteration-returntrain_points=np.where(np.isin(labels,cluster_ids[train_clusters]))[0]test_points=np.where(np.isin(labels,cluster_ids[test_clusters]))[0]# The proportion of data points assigned to each group should# be close the proportion of clusters assigned to each group.balance.append(abs(train_points.size/test_points.size-train_clusters.size/test_clusters.size))test_sets.append(test_points)best=np.argmin(balance)yieldtest_sets[best]class_SpatialKFold(_BaseSpatialCrossValidator):""" Spatial K-Folds cross-validator. Yields indices to split data into training and test sets. Data are first grouped into clusters using either a KMeans or GMM algorithm clusters. The clusters are then split into testing and training sets iteratively along k folds of the data (k is given by *n_splits*). By default, the clusters are split into folds in a way that makes each fold have approximately the same number of data points. Sometimes this might not be possible, which can happen if the number of splits is close to the number of clusters. In these cases, each fold will have the same number of clusters regardless of how many data points are in each cluster. This behaviour can also be disabled by setting ``balance=False``. This cross-validator is preferred over `sklearn.model_selection.KFold` for spatial data to avoid overestimating cross-validation scores. This can happen because of the inherent autocorrelation that is usually associated with this type of data. Parameters ---------- n_groups : int The number of groups to create. This is passed as 'n_clusters=n_groups' for the KMeans algo, and 'n_components=n_groups' for the GMM. If using cluster_method='Hierarchical' then this parameter is ignored. coordinates : np.array A numpy array of coordinate values e.g. np.array([[3337270., 262400.], [3441390., -273060.], ...]) cluster_method : str Which algorithm to use to seperate data points. Either 'KMeans', 'GMM', or 'Hierarchical' max_distance : int If method is set to 'hierarchical' then maximum distance describes the maximum euclidean distances between all observations in a cluster. 'n_groups' is ignored in this case. n_splits : int Number of folds. Must be at least 2. shuffle : bool Whether to shuffle the data before splitting into batches. random_state : int, RandomState instance or None, optional (defasult=None) If int, random_state is the seed used by the random number generator; If RandomState instance, random_state is the random number generator; If None, the random number generator is the RandomState instance used by `np.random`. balance : bool Whether or not to split clusters into fold with approximately equal number of data points. If False, each fold will have the same number of clusters (which can have different number of data points in them). **kwargs : optional, Additional keyword arguments to pass to sklearn.cluster.Kmeans or sklearn.mixture.GuassianMixture depending on the cluster_method argument. """def__init__(self,n_groups=None,coordinates=None,method="Heirachical",max_distance=None,n_splits=5,test_size=0.15,train_size=None,shuffle=True,random_state=None,balance=True,**kwargs):super().__init__(n_groups=n_groups,coordinates=coordinates,method=method,max_distance=max_distance,n_splits=n_splits,**kwargs)ifn_splits<2:raiseValueError("Number of splits must be >=2 for clusterKFold. Given {}.".format(n_splits))self.test_size=test_sizeself.shuffle=shuffleself.random_state=random_stateself.balance=balanceself.kwargs=kwargsdef_iter_test_indices(self,X=None,y=None,groups=None):""" Generates integer indices corresponding to test sets. Parameters ---------- X : array-like, shape (n_samples, 2) Columns should be the easting and northing coordinates of data points, respectively. y : array-like, shape (n_samples,) The target variable for supervised learning problems. Always ignored. groups : array-like, with shape (n_samples,), optional Group labels for the samples used while splitting the dataset into train/test set. Always ignored. Yields ------ test : ndarray The testing set indices for that split. """labels=spatial_clusters(n_groups=self.n_groups,coordinates=self.coordinates,method=self.method,max_distance=self.max_distance,**self.kwargs)cluster_ids=np.unique(labels)ifself.n_splits>cluster_ids.size:raiseValueError("Number of k-fold splits ({}) cannot be greater than the number of ""clusters ({}). Either decrease n_splits or increase the number of ""clusters.".format(self.n_splits,cluster_ids.size))ifself.shuffle:check_random_state(self.random_state).shuffle(cluster_ids)ifself.balance:cluster_sizes=[np.isin(labels,i).sum()foriincluster_ids]try:split_points=_partition_by_sum(cluster_sizes,parts=self.n_splits)folds=np.split(np.arange(cluster_ids.size),split_points)exceptValueError:warnings.warn("Could not balance folds to have approximately the same ""number of data points. Dividing into folds with equal ""number of clusters instead. Decreasing n_splits or increasing ""the number of clusters may help.",UserWarning,)folds=[ifor_,iinKFold(n_splits=self.n_splits).split(cluster_ids)]else:folds=[ifor_,iinKFold(n_splits=self.n_splits).split(cluster_ids)]fortest_clustersinfolds:test_points=np.where(np.isin(labels,cluster_ids[test_clusters]))[0]yieldtest_points