"""
Loading and processing Australian Bureau of Meteorology data.
License: The code in this notebook is licensed under the Apache License,
Version 2.0 (https://www.apache.org/licenses/LICENSE-2.0). Digital Earth
Australia data is licensed under the Creative Commons by Attribution 4.0
license (https://creativecommons.org/licenses/by/4.0/).
Contact: If you need assistance, please post a question on the Open Data
Cube Discord chat (https://discord.com/invite/4hhBQVas5U) or on the GIS Stack
Exchange (https://gis.stackexchange.com/questions/ask?tags=open-data-cube)
using the `open-data-cube` tag (you can view previously asked questions
here: https://gis.stackexchange.com/questions/tagged/open-data-cube).
If you would like to report an issue with this script, you can file one
on GitHub (https://github.com/GeoscienceAustralia/dea-notebooks/issues/new).
Last modified: March 2021
"""
import datetime
import pytz
import ciso8601
from types import SimpleNamespace
import requests
import lxml
import lxml.etree
[docs]
def get_stations(time=None,
observation='http://bom.gov.au/waterdata/services/parameters/Water Course Discharge',
url='http://www.bom.gov.au/waterdata/services'):
""" Get list of stations
:param time: tuple of datetime.datetime objects, or None to query from 1980-1-1 to Now
Returns
========
List of stations:
.name -- string, human readable station name
.pos -- Coordinate of the station or None
.url -- service url identifier
"""
data = tpl_get_stations.format(observation=observation,
**_fmt_time(time))
data = data.replace('\n', '')
rr = requests.post(url, data=data)
return _parse_station_data(rr.text)
[docs]
def get_station_data(station,
time=None,
observation='http://bom.gov.au/waterdata/services/parameters/Water Course Discharge',
url='http://www.bom.gov.au/waterdata/services'):
"""
Query Gauge Data.
:param station: One of the stations see get_stations
:param time: tuple of datetime.datetime objects, or None to query from 1980-1-1 to Now
Returns
========
Pandas dataframe with Timestamp(index), Value columns
"""
data = tpl_get_obs.format(station=station.url,
observation=observation,
**_fmt_time(time))
data = data.replace('\n', '')
rr = requests.post(url, data=data)
return _parse_get_data(rr.text)
[docs]
def mk_station_selector(on_select,
stations=None,
dst_map=None,
**kw):
"""
Add stations to the map and register on_click event.
:param on_select: Will be called when user selects station on the map `on_select(station)`
:param stations: List of stations as returned from get_stations
:param dst_map: Map to add stations markers to
Any other arguments are passed on to Map(..) constructor.
Returns
=======
(map, marker_cluster)
Passes through map=dst_map if not None, or returns newly constructed Map object.
"""
import ipyleaflet as L
if stations is None:
stations = get_stations()
stations = [st for st in stations if st.pos is not None]
pos2st = {st.pos: st for st in stations}
def on_click(event='', type='', coordinates=None):
pos = tuple(coordinates)
st = pos2st.get(pos)
if st is None:
# should probably log warning here
print("Can't map click to station")
return
on_select(st)
markers = [L.Marker(location=st.pos,
draggable=False,
title=st.name)
for st in stations]
cluster = L.MarkerCluster(markers=markers)
if dst_map is None:
dst_map = L.Map(**kw)
dst_map.add_layer(cluster)
cluster.on_click(on_click)
return dst_map, cluster
[docs]
def ui_select_station(stations,
zoom=3,
center=(-24, 138),
**kw):
"""
Create an interactive map for selecting river gauging stations.
"""
import ipywidgets as W
from IPython.display import display
import matplotlib.pyplot as plt
import ipyleaflet as L
from odc.ui import ui_poll
dbg_display = W.Output()
fig_display = W.Output()
btn_done = W.Button(description='Done')
scroll_wheel_zoom = kw.pop('scroll_wheel_zoom', True)
map_widget = L.Map(zoom=zoom,
center=center,
scroll_wheel_zoom=scroll_wheel_zoom,
**kw)
state = SimpleNamespace(pos=None,
gauge_data=None,
finished=False,
station=None)
plt_interactive_state = plt.isinteractive()
plt.interactive(False)
with fig_display:
fig, ax = plt.subplots(1, figsize=(14,4))
ax.set_visible(False)
display(fig)
def _on_select(station):
if state.finished:
print('Please re-run the cell')
return
state.station = station
state.pos = station.pos
state.gauge_data = None
print('Fetching data for: {}'.format(station.name))
try:
xx = get_station_data(station).dropna()
except Exception:
print('Failed to read data')
return
print('Got {} observations'.format(xx.shape[0]))
state.gauge_data = xx
with fig_display:
ax.clear()
ax.set_visible(True)
xx.plot(ax=ax)
ax.set_xlabel("Date")
ax.set_ylabel("Cubic meters per second")
ax.legend([station.name])
fig_display.clear_output(wait=True)
with fig_display:
display(fig)
def on_select(station):
with dbg_display:
_on_select(station)
def on_done(btn):
if state.finished:
with dbg_display:
print('Please re-run the cell')
return
state.finished = True
n_obs = 0 if state.gauge_data is None else state.gauge_data.shape[0]
with dbg_display:
print('''Finished
Station: {}
Number of Observations: {}'''.format(state.station.name, n_obs))
def on_poll():
with dbg_display:
if state.finished:
return state.gauge_data, state.station
return None
mk_station_selector(on_select,
stations=stations,
dst_map=map_widget)
## UI:
##
## MMMMMMMMMMMMM BBBBB
## MMMMMMMMMMMMM .....
## MMMMMMMMMMMMM .....
## MMMMMMMMMMMMM .....
## MMMMMMMMMMMMM .....
## FFFFFFFFFFFFFFFFFFF
## FFFFFFFFFFFFFFFFFFF
# M - Map F - Figure
# B - Button . - Debug output
btn_done.on_click(on_done)
r_panel = W.VBox([btn_done, dbg_display],
layout=W.Layout(width='30%'))
ui = W.VBox([W.HBox([map_widget, r_panel]),
fig_display])
display(ui)
result = ui_poll(on_poll, 1/20) # this will block until done is pressed
#restore interactive state
fig_display.clear_output(wait=True)
with fig_display:
plt.interactive(plt_interactive_state)
plt.show()
return result
def _fmt_time(time=None):
if time is None:
time = (datetime.datetime(1980, 1, 1),
datetime.datetime.now())
t_start, t_end = (t.isoformat() for t in time)
return dict(t_start=t_start, t_end=t_end)
def _parse_float(x):
if x is None:
return float('nan')
try:
return float(x)
except ValueError:
return float('nan')
def _parse_time(x):
t = ciso8601.parse_datetime(x).astimezone(pytz.utc)
return t.replace(tzinfo=None)
def _parse_get_data(text, raw=False):
root = lxml.etree.fromstring(text)
data = [[e.text for e in root.findall('.//{http://www.opengis.net/waterml/2.0}' + t)]
for t in ['time', 'value']]
dd = [(_parse_time(t),
_parse_float(v))
for t, v in zip(*data)]
if raw:
return dd
import pandas as pd
return pd.DataFrame(dd, columns=('Timestamp', 'Value')).set_index('Timestamp')
def _parse_station_data(text):
def parse_pos(pos):
if pos is None:
return None
return tuple(_parse_float(x)
for x in pos.split(' '))
root = lxml.etree.fromstring(text)
data = [[e.text for e in root.findall('.//{http://www.opengis.net/gml/3.2}' + t)]
for t in ['name', 'identifier', 'pos']]
return [SimpleNamespace(name=name, url=url, pos=parse_pos(pos))
for name, url, pos in zip(*data)]
# observation = 'http://bom.gov.au/waterdata/services/parameters/Water Course Discharge'
#
tpl_get_stations = '''
<soap12:Envelope xmlns:soap12="http://www.w3.org/2003/05/soap-envelope"
xmlns:sos="http://www.opengis.net/sos/2.0"
xmlns:wsa="http://www.w3.org/2005/08/addressing"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:ows="http://www.opengis.net/ows/1.1"
xmlns:fes="http://www.opengis.net/fes/2.0"
xmlns:gml="http://www.opengis.net/gml/3.2"
xmlns:swes="http://www.opengis.net/swes/2.0"
xsi:schemaLocation="http://www.w3.org/2003/05/soap-envelope http://www.w3.org/2003/05/soap-envelope/soap-envelope.xsd http://www.opengis.net/sos/2.0 http://schemas.opengis.net/sos/2.0/sos.xsd"
>
<soap12:Header>
<wsa:To>http://www.ogc.org/SOS</wsa:To>
<wsa:Action>http://www.opengis.net/def/serviceOperation/sos/foiRetrieval/2.0/GetFeatureOfInterest</wsa:Action>
<wsa:ReplyTo>
<wsa:Address>http://www.w3.org/2005/08/addressing/anonymous</wsa:Address>
</wsa:ReplyTo>
<wsa:MessageID>0</wsa:MessageID>
</soap12:Header>
<soap12:Body>
<sos:GetFeatureOfInterest service="SOS" version="2.0.0">
<sos:observedProperty>{observation}</sos:observedProperty>
<sos:temporalFilter>
<fes:During>
<fes:ValueReference>om:phenomenonTime</fes:ValueReference>
<gml:TimePeriod gml:id="tp1">
<gml:beginPosition>{t_start}</gml:beginPosition>
<gml:endPosition>{t_end}</gml:endPosition>
</gml:TimePeriod>
</fes:During>
</sos:temporalFilter>
</sos:GetFeatureOfInterest>
</soap12:Body>
</soap12:Envelope>
'''
# {station}, {observation}, {t_start}, {t_end}
tpl_get_obs = '''
<soap12:Envelope xmlns:soap12="http://www.w3.org/2003/05/soap-envelope"
xmlns:sos="http://www.opengis.net/sos/2.0"
xmlns:wsa="http://www.w3.org/2005/08/addressing"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:ows="http://www.opengis.net/ows/1.1"
xmlns:fes="http://www.opengis.net/fes/2.0"
xmlns:gml="http://www.opengis.net/gml/3.2"
xmlns:swes="http://www.opengis.net/swes/2.0"
xsi:schemaLocation="http://www.w3.org/2003/05/soap-envelope http://www.w3.org/2003/05/soap-envelope/soap-envelope.xsd http://www.opengis.net/sos/2.0 http://schemas.opengis.net/sos/2.0/sos.xsd"
>
<soap12:Header>
<wsa:To>http://www.ogc.org/SOS</wsa:To>
<wsa:Action>http://www.opengis.net/def/serviceOperation/sos/core/2.0/GetObservation</wsa:Action>
<wsa:ReplyTo>
<wsa:Address>http://www.w3.org/2005/08/addressing/anonymous</wsa:Address>
</wsa:ReplyTo>
<wsa:MessageID>0</wsa:MessageID>
</soap12:Header>
<soap12:Body>
<sos:GetObservation service="SOS" version="2.0.0">
<sos:procedure>http://bom.gov.au/waterdata/services/tstypes/Pat4_C_B_1_DailyMean</sos:procedure>
<sos:observedProperty>{observation}</sos:observedProperty>
<sos:featureOfInterest>{station}</sos:featureOfInterest>
<sos:temporalFilter>
<fes:During>
<fes:ValueReference>om:phenomenonTime</fes:ValueReference>
<gml:TimePeriod gml:id="tp1">
<gml:beginPosition>{t_start}</gml:beginPosition>
<gml:endPosition>{t_end}</gml:endPosition>
</gml:TimePeriod>
</fes:During>
</sos:temporalFilter>
</sos:GetObservation>
</soap12:Body>
</soap12:Envelope>
'''