# cog.py
"""
Generate Cloud Optimised GeoTIFF (COG) mosaics for DEA tiled products.
This module builds continental-scale mosaics by combining individual DEA tiles
into a single Cloud Optimised GeoTIFF using GDAL tools (`gdalbuildvrt`, `gdal_translate`).
It supports DEA's tiled product structure and naming conventions and can read from both
local disk and public S3 buckets (e.g., `dea-public-data` or `dea-public-data-dev`).
Input Format
------------
Input products must follow the DEA Collection 3 naming conventions and file structure, e.g.:
`s3://dea-public-data/derivative/<product>/<version>/<tile path>/<year>--<freq>/<product>_<tile path>_<year>--<freq>_<dataset maturity>_<band>.tif`
For more information:
https://knowledge.dea.ga.gov.au/guides/reference/collection_3_naming/
https://knowledge.dea.ga.gov.au/guides/reference/collection_3_summary_grid/
Output Format
-------------
Mosaics are saved as:
`<output_dir>/<product>/<version>/continental_mosaics/<time>--<freq>/<product>_mosaic_<time>--<freq>_<band>.tif`
License: The code in this module is licensed under the Apache License,
Version 2.0 (https://www.apache.org/licenses/LICENSE-2.0). Digital Earth
Australia data is licensed under the Creative Commons by Attribution 4.0
license (https://creativecommons.org/licenses/by/4.0/).
Contact: If you need assistance, please post a question on the Open Data
Cube Discord chat (https://discord.com/invite/4hhBQVas5U) or on the GIS Stack
Exchange (https://gis.stackexchange.com/questions/ask?tags=open-data-cube)
using the `open-data-cube` tag (you can view previously asked questions
here: https://gis.stackexchange.com/questions/tagged/open-data-cube).
If you would like to report an issue with this script, you can file one on
GitHub (https://github.com/GeoscienceAustralia/dea-notebooks/issues/new).
Last modified: July 2025
"""
import glob
import logging
import os
import shutil
import subprocess
import tempfile
from urllib.parse import urlparse
import click
import s3fs
from odc.stac import configure_rio
from dea_tools.mosaics.utils import _is_s3, _get_vsicurlhttp_from_s3, _get_vsis3_from_s3
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
def _get_tiles(
product_dir,
product,
version,
time,
freq,
dataset_maturity,
band,
is_s3,
aws_unsigned,
list_tiles=None, # example ['x25y41', 'x25y41']
):
"""
Search for matching tile files from local or S3 paths based on product metadata.
Optionally filters to a subset of specified tiles (e.g., ['x25y41', 'x25y41']).
"""
tiles_pattern = (
f"{product_dir}/"
f"{product}/"
f"{version}/"
"**/**/"
f"{time}--{freq}/"
f"{product}_*{time}--{freq}_{dataset_maturity}_{band}.tif"
)
# Return list of tiles on either S3 or local directory
if is_s3:
fs = s3fs.S3FileSystem(anon=True)
configure_rio(cloud_defaults=True, aws={"aws_unsigned": aws_unsigned})
tiles_list = fs.glob(tiles_pattern)
else:
tiles_list = glob.glob(tiles_pattern, recursive=True)
# Optionally filter list of tiles if requested
if list_tiles:
xy_patterns = [xy.replace("y", "/y") for xy in list_tiles] # i.e. from 'x25y41' to 'x25/y41'
tiles_list = [tile for tile in tiles_list if any(xy in tile for xy in xy_patterns)]
return tiles_list
[docs]
def make_cog_mosaic(
product,
band,
time,
freq,
version,
dataset_maturity,
product_dir,
output_dir,
cog_blocksize,
overview_count,
overview_resampling,
compression_algo,
compression_level,
aws_unsigned,
skip_existing,
list_tiles=None,
vsi_method="vsicurl",
):
"""
Generate a COG mosaic for a given tiled DEA product.
Products must follow the DEA Collection 3 naming
conventions and file structure:
https://knowledge.dea.ga.gov.au/guides/reference/collection_3_naming/
https://knowledge.dea.ga.gov.au/guides/reference/collection_3_summary_grid/
Parameters:
----------
product : str
The name of the DEA product (e.g. 'ga_ls_landcover_class_cyear_3').
band : str
The variable or band to extract.
time : int or str
The target time of the mosaic, year if annual summaries (e.g. '2023'),
year-month for seasonal (e.g. water observations nov_mar --> '2024-11')
freq : str
The frequency of the summary product (e.g. 'P1Y').
version : str
Product version (e.g. '2-0-0').
dataset_maturity : str
Dataset maturity stage. Usually: 'final'.
product_dir : str
S3 directory for the product. Usually 's3://dea-public-data/derivative/',
which is the DEA public bucket and derivates products folder,
which corresponds with https://data.dea.ga.gov.au/derivative/
HTTPS endpoint.
output_dir : str
local directory or s3 directory where to save output.
cog_blocksize : int or str
Size of internal COG tiling. Use 1024, unless there are specific reasons to
use a different value.
overview_count : int or str
Number of image overviews to generate.
Use 7 for 30 m resolution products (like Landsat),
use 8 for 10 m resolution products (like Sentinel-2).
overview_resampling : str
GDAL resampling method used when building overviews.
Options include (use all capital letters):
- 'MODE' for categorical data (e.g. land cover),
- 'BILINEAR' for continuous data,
- 'NEAREST' for narrow continuous data with many nodata pixels (e.g., coastal products).
compression_algo : str
GDAL compression algorithm used for output COG.
Use 'ZSTD', unless there are specific reasons to use a different algorithm.
compression_level : int or str
Level of compression of output COG.
Use 9, unless there are specific reasons to use a different level.
aws_unsigned : bool
Whether to sign AWS requests for S3 access.
skip_existing : bool
Whether to skip generation if output already exists.
list_tiles : list of strings, optional
List including tiles of interest to include in the output mosaic.
For example: ['x25y41', 'x25y41'].
Defaults to None --> use all tiles available.
vsi_method : str, optional
Whether to use "/vsicurl/" for HTTPS-based URIs in the interim
Virtual Raster ("vsicurl"), or "/vsis3/" for accessing data directly
from AWS S3 ("vsis3"). Default is "vsicurl".
Notes:
------
All other `gdal_translate` parameters are intentionally
omitted in this function. These options should be standardized
across all products, and are applied consistently as part of
downstream processing.
"""
# first log
log = logging.getLogger(__name__)
input_params = locals()
run_id = f"[{product}] [{version}] [{time}] [{band}]"
log.info(f"{run_id}: Using parameters {input_params}")
# Set product and output paths to plain strings
# TODO: refactor code to use pathlib throughout
product_dir = str(product_dir)
output_dir = str(output_dir)
# Determine if input data is located on S3
is_input_dir_s3 = _is_s3(product_dir)
is_out_dir_s3 = _is_s3(output_dir)
# Clean string to prepare for analysis
product_dir = product_dir.removeprefix("s3://")
product_dir = product_dir.rstrip("/")
log.info(f"{run_id}: Using input data product directory: {product_dir}")
# Determine output directory and file path following naming convention
# /derivative/<product_id>/<version>/continental_mosaics/<time>/<product_id>_mosaic_<time>_<band name>.tif
output_dir = output_dir.rstrip("/")
if is_out_dir_s3:
output_dir = f"{output_dir}/{product}/{version}/continental_mosaics/{time}--{freq}"
output_file_path = f"{output_dir}/{product}_mosaic_{time}--{freq}_{band}.tif"
log.info(f"{run_id} Output path: {output_file_path}")
else:
output_dir = os.path.join(output_dir, product, version, "continental_mosaics", f"{time}--{freq}")
output_file_path = os.path.join(output_dir, f"{product}_mosaic_{time}--{freq}_{band}.tif")
log.info(f"{run_id} Output path: {output_file_path}")
# Check if output file already exists
if is_out_dir_s3:
fs = s3fs.S3FileSystem(anon=aws_unsigned)
output_exists = fs.exists(output_file_path.removeprefix("s3://"))
else:
output_exists = os.path.exists(output_file_path)
# If output exists, either skip processing or overwrite
if output_exists:
if skip_existing:
log.info(
f"{run_id}: Output already exists at {output_file_path} and skip_existing=True. Skipping generation."
)
return
log.warning(f"{run_id}: Output already exists at {output_file_path} but skip_existing=False. Overwriting.")
else:
log.info(f"{run_id}: Output does not exist. Proceeding with mosaic generation.")
# Get list of paths
log.info(f"{run_id}: Finding data to mosaic")
tiles_list = _get_tiles(
product_dir,
product,
version,
time,
freq,
dataset_maturity,
band,
is_input_dir_s3,
aws_unsigned,
list_tiles=list_tiles,
)
# Use /vsicurl/ or /vsis3/ path for `gdalbuildvrt` compatibility
if vsi_method == "vsicurl":
tiles_list = [_get_vsicurlhttp_from_s3(tile) for tile in tiles_list]
elif vsi_method == "vsis3":
tiles_list = [_get_vsis3_from_s3(tile) for tile in tiles_list]
log.info(f"{run_id}: Number of tiles to mosaic: {len(tiles_list)}")
if len(tiles_list) > 0:
# Create a temporary directory to house files before syncing
with tempfile.TemporaryDirectory() as temp_dir:
log.info(f"{run_id}: Writing data to temporary folder: {temp_dir}")
# Output paths for intermediate files
file_list_name = os.path.join(temp_dir, f"{product}_{time}--{freq}_{band}_{version}.txt")
vrt_name = os.path.join(temp_dir, f"{product}_{time}--{freq}_{band}_{version}.vrt")
output_name = os.path.join(temp_dir, f"{product}_mosaic_{time}--{freq}_{band}.tif")
# Write list of files to a temporary text file, so it can be
# used as an input to `gdalbuildvrt`
with open(file_list_name, "w") as f:
for tile in tiles_list:
f.write(f"{tile}\n")
# Build VRT that will subsequently be used to generate COG
log.info(f"{run_id}: Building virtual raster (VRT)")
try:
subprocess.run(
["gdalbuildvrt", vrt_name, "-input_file_list", file_list_name],
check=True,
text=True,
stderr=subprocess.STDOUT,
)
except subprocess.CalledProcessError as e:
log.error(f"{run_id}: gdalbuildvrt failed with error: {e.stderr} {e.stdout}")
raise
# Convert VRT to Cloud Optimized GeoTIFF (COG)
log.info(f"{run_id}: Converting VRT to COG mosaic")
# fmt: off
try:
subprocess.run(
[
"gdal_translate",
vrt_name,
output_name,
"-co", "NUM_THREADS=ALL_CPUS", # Parallelisation
"-of", "COG", # Output format
"-co", "BIGTIFF=YES", # Allow large TIFFs
"-co", f"BLOCKSIZE={cog_blocksize}", # Tiling
"-co", "OVERVIEWS=IGNORE_EXISTING", # Force overview regen
"-co", f"OVERVIEW_RESAMPLING={overview_resampling}", # Resampling for overviews
"-co", f"OVERVIEW_COUNT={overview_count}", # Number of overviews
"-co", f"COMPRESS={compression_algo}", # Compression
"-co", f"LEVEL={compression_level}", # Compression level
"-co", "PREDICTOR=YES", # Compression predictor
],
check=True,
text=True,
stderr=subprocess.STDOUT,
)
except subprocess.CalledProcessError as e:
log.error(f"{run_id}: gdal_translate failed with error: {e.stderr} {e.stdout}")
raise
# fmt: on
# Copy output to S3
if is_out_dir_s3:
log.info(f"{run_id}: Writing COG to S3: {output_file_path}")
subprocess.run(
[
"aws",
"s3",
"cp",
"--only-show-errors",
"--acl",
"bucket-owner-full-control",
str(output_name),
str(output_file_path),
],
check=True,
)
# Copy locally to output folder
else:
log.info(f"{run_id}: Writing data locally: {output_file_path}")
os.makedirs(output_dir, exist_ok=True)
shutil.copy(output_name, output_file_path)
else:
log.info(f"{run_id}: No input tiles found")
@click.command()
@click.option(
"--product",
type=str,
required=True,
help="The name of the product to be mosaicked (e.g. 'ga_ls_landcover_class_cyear_3').",
)
@click.option(
"--band",
type=str,
required=True,
help="The variable or band to extract (e.g. 'level4')",
)
@click.option(
"--time",
type=str,
required=True,
help="The target time of the mosaic (e.g. '2022').",
)
@click.option(
"--freq",
type=str,
required=True,
help="The frequency of the summary product (e.g. 'P1Y').",
)
@click.option(
"--version",
type=str,
required=True,
help="The version number of the product (e.g. '2-0-0').",
)
@click.option(
"--dataset_maturity",
type=str,
default="final",
show_default=True,
help="Dataset maturity of the data to be mosaicked. Usually: 'final'.",
)
@click.option(
"--product_dir",
type=str,
default="s3://dea-public-data-dev/derivative/",
show_default=True,
help="The directory/location to read the tile COGs from; supports "
"both local disk and S3 locations. E.g. 's3://dea-public-data/derivative/', "
"corresponding to https://data.dea.ga.gov.au/derivative/.",
)
@click.option(
"--output_dir",
type=str,
required=True,
help="Local or S3 directory where to save output. "
"The function will add on `{product}/{version}/continental_mosaics/{time}--{freq}` ",
)
@click.option(
"--cog_blocksize",
type=int,
default=1024,
show_default=True,
help="Size of internal COG tiling. Use 1024, unless there are specific reasons to use a different value.",
)
@click.option(
"--overview_count",
type=int,
required=True,
help="Number of image overviews to generate. "
"Use 7 for 30m resolution products (e.g., Landsat), "
"or 8 for 10m resolution products (e.g., Sentinel-2).",
)
@click.option(
"--overview_resampling",
type=str,
required=True,
help="GDAL resampling method used for overviews. Use uppercase values: "
"'MODE' for categorical data (e.g., land cover), "
"'BILINEAR' for continuous data, "
"'NEAREST' for sparse/narrow continuous data.",
)
@click.option(
"--compression_algo",
type=str,
default="ZSTD",
show_default=True,
help="GDAL compression algorithm used for output COG."
"Use 'ZSTD', unless there are specific reasons to use a different algorithm.",
)
@click.option(
"--compression_level",
type=int,
default=9,
show_default=True,
help="Level of compression of output COG. Use '9', unless there are specific reasons to use a different level.",
)
@click.option(
"--aws_unsigned/--no-aws_unsigned",
is_flag=True,
default=True,
help="Whether to sign AWS requests for S3 access. Defaults to "
"True; can be set to False by passing `--no-aws_unsigned`.",
)
@click.option(
"--skip_existing/--no-skip_existing",
is_flag=True,
default=False,
show_default=True,
help="Skip generation if output already exists.Defaults to False",
)
@click.option(
"--list_tiles",
type=str,
required=False,
help="Comma-separated list of tiles to include in the mosaic. Example: x25y41,x26y42. "
"If omitted, all tiles will be used.",
)
def make_cog_mosaic_cli(
product,
band,
time,
freq,
version,
dataset_maturity,
product_dir,
output_dir,
cog_blocksize,
overview_count,
overview_resampling,
compression_algo,
compression_level,
aws_unsigned,
skip_existing,
list_tiles,
):
"""
CLI entry point for generating DEA COG mosaics from tiled datasets.
Passes user inputs to the core mosaic generation function.
"""
# Convert string of tiles to list
list_tiles = list_tiles.split(",") if list_tiles else None
# Run analysis function
make_cog_mosaic(
product,
band,
time,
freq,
version,
dataset_maturity,
product_dir,
output_dir,
cog_blocksize,
overview_count,
overview_resampling,
compression_algo,
compression_level,
aws_unsigned,
skip_existing,
list_tiles,
)
if __name__ == "__main__":
make_cog_mosaic_cli()