diff --git a/.github/workflows/ml_training.yml b/.github/workflows/ml_training.yml index 32eb6b3..260cd6d 100644 --- a/.github/workflows/ml_training.yml +++ b/.github/workflows/ml_training.yml @@ -98,10 +98,14 @@ jobs: # station + basin index maps, and training_config.json. All artifacts # travel together; manifest.json carries sha256s + schema so the # mobile app can verify what it downloaded. See docs/INFERENCE.md. + # lstm_model_int8.tflite is shipped only if the int8 parity check in + # export_mobile.py passes; if it didn't ship, softprops/action-gh-release + # quietly skips the missing file. files: | lstm_model.h5 lstm_model.mlpackage.zip lstm_model.tflite + lstm_model_int8.tflite scalers.json station_index.json basin_index.json @@ -119,6 +123,7 @@ jobs: ./lstm_model.h5 ./lstm_model.mlpackage.zip ./lstm_model.tflite + ./lstm_model_int8.tflite ./scalers.json ./station_index.json ./basin_index.json diff --git a/docs/INFERENCE.md b/docs/INFERENCE.md index 7413bf0..758d487 100644 --- a/docs/INFERENCE.md +++ b/docs/INFERENCE.md @@ -10,7 +10,8 @@ Every successful weekly training run on `main` produces a GitHub Release tagged | --- | --- | | `lstm_model.h5` | Canonical Keras model. The source of truth; everything else is derived. | | `lstm_model.mlpackage.zip` | CoreML mlprogram for iOS (iOS 15+). Unzip and pass to `MLModel(contentsOf:)`. | -| `lstm_model.tflite` | TFLite for Android. May require the Flex delegate — see TFLite note below. | +| `lstm_model.tflite` | TFLite (float32) for Android. May require the Flex delegate — see TFLite note below. | +| `lstm_model_int8.tflite` | Optional dynamic-range int8 TFLite. Smaller, slightly less accurate. Only present when it passed the parity gate; see `manifest.tflite_int8_max_abs_diff`. | | `scalers.json` | Per-column scaler params (mean, scale, transform). Inputs and outputs are in scaled space; you invert with this. | | `station_index.json` | Map `site_id` → embedding index. Index `0` is reserved for "unseen". | | `basin_index.json` | Map HUC8 → embedding index. Index `0` is reserved for "unseen". | @@ -37,8 +38,8 @@ The model takes a dict of 5 inputs. Names are stable across exports: | Name | dtype | Shape | Source | | --- | --- | --- | --- | -| `encoder_input` | float32 | `[1, 60, len(encoder_features)]` | Last 60 days of features, columns in `training_config.encoder_features` order, pre-scaled. | -| `decoder_input` | float32 | `[1, 14, len(decoder_features)]` | Next 14 days of forecast-time-available features, columns in `training_config.decoder_features` order, pre-scaled. | +| `encoder_input` | float32 | `[1, 60, len(encoder_features)]` | Last 60 days of features, columns in `training_config.encoder_features` order, pre-scaled. Includes observed precipitation (`precipitation`, mm/day from GHCND PRCP). | +| `decoder_input` | float32 | `[1, 14, len(decoder_features)]` | Next 14 days of forecast-time-available features, columns in `training_config.decoder_features` order, pre-scaled. Includes forecast precipitation — pull `precipitation_sum` from Open-Meteo's daily forecast (`data/get_forecast.py` does this server-side; mobile clients can call Open-Meteo directly). | | `persistence_input` | float32 | `[1, len(target_features)]` | Last encoder-day flow values (the `Min Flow` / `Max Flow` columns from the last encoder row), in scaled space. | | `station_input` | int32 | `[1]` | `station_index.json[site_id]`, or `0` if the site isn't in the map. | | `basin_input` | int32 | `[1]` | `basin_index.json[huc8]`, or `0` if the HUC isn't in the map. | @@ -92,3 +93,18 @@ Both embedding layers reserve index `0` for unknown. If the user picks a site no ## Reference fixture The export parity tests in `tests/test_export_parity.py` generate synthetic inputs, run them through Keras / CoreML / TFLite, and assert they agree within `1e-3`. They're the executable spec of this document — when in doubt, read that file. + +## Publishing the first release + +The weekly cron at `.github/workflows/ml_training.yml` produces a Release tagged `model-YYYY.MM.DD` whenever it runs on `main`. To create the first release before the next Monday cron fires, after this branch is merged: + +```bash +gh workflow run ml_training.yml --ref main +gh run watch # follow until done +``` + +Verify the release appeared with all expected files (8 if int8 shipped, 7 otherwise) and a valid manifest: + +```bash +gh release view "model-$(date -u +%Y.%m.%d)" --json assets,name +``` diff --git a/openFlowML/combine_data.py b/openFlowML/combine_data.py index cdeb88f..3acdbbb 100644 --- a/openFlowML/combine_data.py +++ b/openFlowML/combine_data.py @@ -38,6 +38,12 @@ # Longest interior gap (in days) we are willing to interpolate across for flow # and temperature, which can change quickly day-to-day. MAX_GAP_DAYS = 7 +# Precipitation is far less amenable to interpolation than temperature: most +# days are dry, storms are spike events, and "average two dry days around a +# storm to fill the storm day" is meaningfully wrong. Use a very short interior +# interpolation limit (carries through ~2-day missing-data sensor outages) and +# then default to 0 ("no rain") for anything still missing. +MAX_PRECIP_GAP_DAYS = 2 # SWE changes slowly (snowpack accumulates/melts over weeks), so we tolerate # longer interior gaps in the SWE series before giving up on a value. MAX_SWE_GAP_DAYS = 30 @@ -94,7 +100,11 @@ def merge_dataframes(noaa_data, flow_data, site_id, start_date, end_date, ) flow_daily = _to_daily_series(flow_data, ['Min Flow', 'Max Flow'], daily_index) - noaa_daily = _to_daily_series(noaa_data, ['TMIN', 'TMAX'], daily_index) + # NOAA fetch now returns precipitation alongside TMIN/TMAX (get_noaa.py + # surfaces GHCND PRCP as `precipitation` in mm). The column may still + # be absent if the NCEI station had no PRCP coverage at all. + noaa_daily = _to_daily_series( + noaa_data, ['TMIN', 'TMAX', 'precipitation'], daily_index) combined = pd.concat([flow_daily, noaa_daily], axis=1) for col in CORE_COLUMNS: @@ -104,8 +114,17 @@ def merge_dataframes(noaa_data, flow_data, site_id, start_date, end_date, # Time-aware interpolation of short interior gaps only (limit_area # 'inside' means no edge extrapolation). `combined` is a single station, - # so this interpolation never bleeds across station boundaries. - combined = combined.interpolate(method='time', limit=MAX_GAP_DAYS, limit_area='inside') + # so this interpolation never bleeds across station boundaries. We + # interpolate the core flow/temp columns at MAX_GAP_DAYS and handle + # precipitation separately just below with a much shorter limit + # (interpolation across a missed storm day is misleading). + combined[CORE_COLUMNS] = combined[CORE_COLUMNS].interpolate( + method='time', limit=MAX_GAP_DAYS, limit_area='inside') + if 'precipitation' in combined.columns: + combined['precipitation'] = pd.to_numeric( + combined['precipitation'], errors='coerce') + combined['precipitation'] = combined['precipitation'].interpolate( + method='time', limit=MAX_PRECIP_GAP_DAYS, limit_area='inside') # SWE: separate, slow-varying series; longer interpolation limit. if swe_data is not None and not swe_data.empty: @@ -159,9 +178,17 @@ def merge_dataframes(noaa_data, flow_data, site_id, start_date, end_date, combined['reservoir_release'] = float('nan') # Drop rows still missing any core flow/temp value. SWE / soil_moisture - # are NOT in CORE_COLUMNS, so a missing one never drops a row. + # / precipitation are NOT in CORE_COLUMNS, so a missing one never + # drops a row. before = len(combined) combined = combined.dropna(subset=CORE_COLUMNS) + # Precipitation: 0 mm means "no measurable rain", a legitimate default + # for any post-interpolation gap (NCEI station with no PRCP coverage, + # short outage that exceeded MAX_PRECIP_GAP_DAYS, etc.). Most days in + # most basins are zero anyway, so this defaults to the modal value. + if 'precipitation' not in combined.columns: + combined['precipitation'] = 0.0 + combined['precipitation'] = combined['precipitation'].fillna(0.0) # SWE: 0 means "no snow", which IS a legitimate default; keep that. combined['SWE'] = combined['SWE'].fillna(0.0) # Soil moisture: 0 means "Sahara desert", which is NOT a legitimate diff --git a/openFlowML/data/cbrfc_lid_map.json b/openFlowML/data/cbrfc_lid_map.json index e459cc2..44ed81e 100644 --- a/openFlowML/data/cbrfc_lid_map.json +++ b/openFlowML/data/cbrfc_lid_map.json @@ -1,3 +1,3 @@ { - "_comment": "Map of OpenFlow site_id (USGS:XXXX or DWR:XXXX) to NWS / AHPS 5-letter LID. Used by get_cbrfc.py for both live AHPS forecast retrieval and NWPS historical forecast archive lookup. Add a row when wiring CBRFC for a new gauge; a missing site_id silently skips the CBRFC baseline for that station (it remains a fully optional comparison baseline). Find the LID for a USGS site by searching https://water.weather.gov/ahps2/ for the gauge name and reading the LID from the page URL." + "_comment": "Map of OpenFlow site_id (USGS:XXXX or DWR:XXXX) to NWS / AHPS 5-letter LID. Used by data/get_cbrfc.py for both the live AHPS forecast and the historical NWPS forecast archive. To wire a new gauge, run: python -m data.find_lid --site-id USGS:09163500 -- this prints the closest NWS gauges via the NWPS API. Copy the chosen LID into this file and re-run training. A missing site_id silently skips the CBRFC baseline for that station; CBRFC is an optional comparison baseline, not a training input." } diff --git a/openFlowML/data/find_lid.py b/openFlowML/data/find_lid.py new file mode 100644 index 0000000..43ae26e --- /dev/null +++ b/openFlowML/data/find_lid.py @@ -0,0 +1,142 @@ +""" +Helper: find candidate NWS / AHPS LIDs near a given latitude/longitude. + +The CBRFC baseline (data/get_cbrfc.py) needs a mapping site_id -> NWS LID. +Mappings are curated by hand in data/cbrfc_lid_map.json because there's no +algorithmic equivalence between a USGS site number and an AHPS LID. This +script queries the NWPS gauge index for gauges within a radius and prints +their LID + name + distance so a maintainer can pick the right one. + +Usage: + python -m data.find_lid --lat 38.52 --lon -106.96 + python -m data.find_lid --site-id USGS:09163500 # auto-resolves coords + +Output is a ranked list; the closest gauge by haversine is usually the right +match for a flow-monitoring gauge near the LID's location. Copy the chosen +mapping into data/cbrfc_lid_map.json and re-run train.py to enable CBRFC +backtesting for that site. +""" + +import argparse +import logging +import math +import sys +from typing import List, Optional + +from data.utils import data_utils + +logger = logging.getLogger(__name__) +if not logging.getLogger().hasHandlers(): + logging.basicConfig(level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s') + +# NWPS publishes a gauges index; lat-bbox filter keeps the response small. +NWPS_GAUGES_URL = "https://api.water.noaa.gov/nwps/v1/gauges" + + +def _haversine(lat1, lon1, lat2, lon2): + """Great-circle distance in km.""" + R = 6371.0 + lat1r, lat2r = math.radians(lat1), math.radians(lat2) + dlat = math.radians(lat2 - lat1) + dlon = math.radians(lon2 - lon1) + a = math.sin(dlat / 2) ** 2 + math.cos(lat1r) * math.cos(lat2r) * math.sin(dlon / 2) ** 2 + return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) + + +def find_nearest_lids(lat: float, lon: float, *, + bbox_deg: float = 0.5, top_n: int = 5) -> List[dict]: + """ + Return up to `top_n` NWS gauges nearest to (lat, lon) within a + ±bbox_deg lat/lon window. Each entry has {lid, name, lat, lon, + distance_km, rfc}. + """ + params = { + # NWPS supports bbox filtering as a comma-joined "minLon,minLat,maxLon,maxLat". + 'bbox': f"{lon - bbox_deg},{lat - bbox_deg},{lon + bbox_deg},{lat + bbox_deg}", + } + response = data_utils.request_with_retry(NWPS_GAUGES_URL, params=params) + if response is None: + logger.error("NWPS gauge search returned no response") + return [] + try: + payload = response.json() + except ValueError: + logger.error("NWPS returned non-JSON") + return [] + gauges = payload.get('gauges') or payload.get('data') or [] + enriched = [] + for g in gauges: + try: + g_lat = float(g.get('latitude') or g.get('lat')) + g_lon = float(g.get('longitude') or g.get('lon')) + except (TypeError, ValueError): + continue + enriched.append({ + 'lid': g.get('lid') or g.get('id'), + 'name': g.get('name') or g.get('siteName', ''), + 'lat': g_lat, + 'lon': g_lon, + 'distance_km': _haversine(lat, lon, g_lat, g_lon), + 'rfc': g.get('rfc') or g.get('forecastCenter', ''), + }) + enriched.sort(key=lambda x: x['distance_km']) + return enriched[:top_n] + + +def _coords_for_site_id(site_id: str) -> Optional[tuple]: + """Resolve an OpenFlow site_id (USGS:XXX or DWR:XXX) to (lat, lon) via the existing coordinate utilities.""" + from data.utils.get_coordinates import get_usgs_coordinates, get_dwr_coordinates + prefix, _, sid = site_id.partition(':') + if prefix == 'USGS': + coords = get_usgs_coordinates(sid) + elif prefix == 'DWR': + coords = get_dwr_coordinates(sid) + else: + logger.error("Unsupported prefix in %s", site_id) + return None + if not coords: + return None + return float(coords['latitude']), float(coords['longitude']) + + +def main(): + parser = argparse.ArgumentParser(description=__doc__) + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument('--site-id', help='OpenFlow site_id like USGS:09163500') + group.add_argument('--latlon', nargs=2, type=float, metavar=('LAT', 'LON'), + help='Direct lat lon') + parser.add_argument('--top-n', type=int, default=5) + parser.add_argument('--bbox-deg', type=float, default=0.5, + help='Search half-window in degrees (default 0.5 ~= 55 km)') + args = parser.parse_args() + + if args.site_id: + coords = _coords_for_site_id(args.site_id) + if not coords: + logger.error("Could not resolve coordinates for %s", args.site_id) + return 1 + lat, lon = coords + print(f"# Site {args.site_id} -> ({lat}, {lon})") + else: + lat, lon = args.latlon + + hits = find_nearest_lids(lat, lon, bbox_deg=args.bbox_deg, top_n=args.top_n) + if not hits: + print("# No NWS gauges found within the bbox; widen --bbox-deg.") + return 1 + print(f"# Top {len(hits)} NWS gauges near ({lat:.4f}, {lon:.4f}):") + print("# LID distance_km name RFC") + for h in hits: + print(f" {h['lid']:6s} {h['distance_km']:>10.2f} " + f"{(h['name'] or '')[:40]:40s} {h['rfc']}") + if args.site_id: + top = hits[0] + print() + print(f"# To wire the closest match, append to data/cbrfc_lid_map.json:") + print(f' "{args.site_id}": "{top["lid"]}"') + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/openFlowML/data/get_forecast.py b/openFlowML/data/get_forecast.py index 26fc001..0a90312 100644 --- a/openFlowML/data/get_forecast.py +++ b/openFlowML/data/get_forecast.py @@ -7,18 +7,21 @@ from data.utils import data_utils """ -14-day daily temperature forecast for a point. - -The model design (see the wiki + the Phase 2 plan) needs forecast TMIN/TMAX as -inputs to the model's forecast window. The official NWS api.weather.gov -forecast caps at ~7 days, so we fetch from Open-Meteo's free public API, -which serves GFS-based daily forecasts out to 16 days -- this is the -"GFS / outlooks to reach 14 days" path the user picked. - -Training uses *actual* (shifted) historical TMIN/TMAX in the forecast window; -this module is what an inference run will call. Importantly, this is point -data only -- soil moisture / SWE are current-conditions features, not things -this fetcher produces. +14-day daily TMIN/TMAX + precipitation forecast for a point. + +The model design (see the wiki + the Phase 2 plan) needs forecast TMIN/TMAX +*and* daily precipitation as inputs to the model's forecast window. The +official NWS api.weather.gov forecast caps at ~7 days, so we fetch from +Open-Meteo's free public API, which serves GFS-based daily forecasts out to +16 days -- this is the "GFS / outlooks to reach 14 days" path the user picked. + +Training uses *actual* (shifted) historical TMIN/TMAX/precipitation in the +forecast window; this module is what an inference run will call. Importantly, +this is point data only -- soil moisture / SWE are current-conditions +features, not things this fetcher produces. + +Units: TMIN/TMAX in Fahrenheit (matches GHCND TMIN/TMAX), precipitation in +millimetres (matches GHCND PRCP after combine_data's tenths-of-mm conversion). """ if not logging.getLogger().hasHandlers(): @@ -33,21 +36,23 @@ def _empty_forecast(): - return pd.DataFrame(columns=['Date', 'TMIN', 'TMAX']) + return pd.DataFrame(columns=['Date', 'TMIN', 'TMAX', 'precipitation']) def get_open_meteo_forecast(lat, lon, days=DEFAULT_HORIZON_DAYS, temperature_unit='fahrenheit'): """ - Fetch a daily TMIN/TMAX forecast from Open-Meteo for the next `days` days - starting today. Default unit is Fahrenheit to match the GHCND TMIN/TMAX - feature scale used during training. + Fetch a daily TMIN/TMAX + precipitation forecast from Open-Meteo for the + next `days` days starting today. Default temperature unit is Fahrenheit to + match the GHCND TMIN/TMAX feature scale used during training. Precipitation + is in mm (Open-Meteo's `precipitation_sum` default), matching the GHCND + PRCP unit after combine_data's tenths-of-mm conversion. """ if days < 1 or days > MAX_HORIZON_DAYS: raise ValueError(f"days must be in 1..{MAX_HORIZON_DAYS}") params = { "latitude": lat, "longitude": lon, - "daily": "temperature_2m_max,temperature_2m_min", + "daily": "temperature_2m_max,temperature_2m_min,precipitation_sum", "forecast_days": days, "temperature_unit": temperature_unit, "timezone": "America/Denver", @@ -65,16 +70,28 @@ def get_open_meteo_forecast(lat, lon, days=DEFAULT_HORIZON_DAYS, temperature_uni dates = daily.get('time') or [] tmax = daily.get('temperature_2m_max') or [] tmin = daily.get('temperature_2m_min') or [] + precip = daily.get('precipitation_sum') or [] if not dates or len(dates) != len(tmax) or len(dates) != len(tmin): logger.error("Unexpected Open-Meteo payload shape") return _empty_forecast() + # precipitation_sum is the newest of the three fields requested; tolerate + # an older Open-Meteo deployment that doesn't return it by zero-filling. + if len(precip) != len(dates): + logger.warning("Open-Meteo returned no precipitation_sum; defaulting to 0") + precip = [0.0] * len(dates) - return pd.DataFrame({'Date': dates, 'TMIN': tmin, 'TMAX': tmax}) + return pd.DataFrame({ + 'Date': dates, + 'TMIN': tmin, + 'TMAX': tmax, + 'precipitation': precip, + }) def get_forecast(lat, lon, days=DEFAULT_HORIZON_DAYS): """ - Public API: a 14-day daily TMIN/TMAX forecast for (lat, lon). + Public API: a 14-day daily TMIN/TMAX + precipitation forecast for + (lat, lon). Currently always Open-Meteo (GFS-backed). Kept as a thin wrapper so a future api.weather.gov-primary / Open-Meteo-fallback blend can drop in diff --git a/openFlowML/data/get_noaa.py b/openFlowML/data/get_noaa.py index 584434d..a9bcd53 100644 --- a/openFlowML/data/get_noaa.py +++ b/openFlowML/data/get_noaa.py @@ -11,16 +11,35 @@ import sys import logging -# given a coordinate, find closest NOAA station -# tests a single NOAA station to get historical temperature data -# handle errors accordingly if data is not available -# NOAA station may not have current daily data so script wiil find one with recent data -# TODO: need better ways to check completness of numeric temperature data before deciding it's good data +# Given a coordinate, find the closest NOAA GHCND station that actually has +# usable daily TMIN/TMAX/PRCP data over the requested window. +# +# We gate stations in two passes: +# 1. NCEI metadata pass: `datacoverage > 0.87` and a date range that brackets +# our window. This is fast (one metadata call per candidate) but is a +# coverage promise across the station's entire history -- a station with +# 88% lifetime coverage can still be missing the last 6 months. +# 2. Realized-data pass: after `fetch_temperature_data` returns, we measure +# the actual non-NaN fraction of TMIN/TMAX over the requested window +# and reject the station if it drops below MIN_OBSERVED_FRACTION. This is +# what the old "TODO: better completeness checks" comment was about -- +# `find_station_with_recent_data` could happily return a station whose +# daily data was 60% NaN once you actually downloaded it. +# +# PRCP is reported by NCEI in tenths of mm; we divide by 10 to land in mm, +# the same unit Open-Meteo serves for precipitation_sum. Country = 'US' noaa_api_token = "ensQWPauKcbtSOmsAvlwRVfWyQjJpbHa" # not sensitive or currently used headers = {"token": noaa_api_token} -fileds = ["TMIN","TMAX"] +# TMIN / TMAX are required (data_utils.CORE_REQUIRED). PRCP joins as an +# optional column -- if the closest station's PRCP is too gappy we still +# accept the station for temperature and combine_data will fill PRCP with 0. +fileds = ["TMIN", "TMAX", "PRCP"] +REQUIRED_FIELDS = ("TMIN", "TMAX") +# Stations with realized coverage below this on any required field are +# rejected so we walk down to the next candidate. +MIN_OBSERVED_FRACTION = 0.7 # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') @@ -209,7 +228,7 @@ def fetch_temperature_data(nearest_station_id, start_str, end_str): "dataset": "daily-summaries", "startDate": start_str + "T00:00:00", "endDate": end_str + "T00:00:00", - "dataTypes": "TMIN,TMAX", + "dataTypes": "TMIN,TMAX,PRCP", "stations": nearest_station_id, } @@ -223,7 +242,7 @@ def fetch_temperature_data(nearest_station_id, start_str, end_str): if response_text: response_text = "\n".join([line.strip().replace('"', '') for line in response_text.splitlines()]) reader = csv.DictReader(io.StringIO(response_text)) - + # Convert reader output to a list of dictionaries and then to a DataFrame temperature_data_list = list(reader) temperature_df = pd.DataFrame(temperature_data_list) @@ -231,11 +250,19 @@ def fetch_temperature_data(nearest_station_id, start_str, end_str): # Rename 'STATION' column to 'NOAA_station' temperature_df.rename(columns={'STATION': 'NOAA_station'}, inplace=True) - # Replace empty strings with NaN and convert to numeric for TMAX and TMIN - temperature_df['TMAX'].replace('', np.nan, inplace=True) - temperature_df['TMIN'].replace('', np.nan, inplace=True) - temperature_df['TMAX'] = pd.to_numeric(temperature_df['TMAX'], errors='coerce') - temperature_df['TMIN'] = pd.to_numeric(temperature_df['TMIN'], errors='coerce') + # Coerce TMIN/TMAX (already in degrees in NCEI daily-summaries) and + # PRCP (tenths of mm in GHCND; we convert to mm to match Open-Meteo + # precipitation_sum). Missing values come through as empty strings. + for col in ('TMAX', 'TMIN'): + if col in temperature_df.columns: + temperature_df[col].replace('', np.nan, inplace=True) + temperature_df[col] = pd.to_numeric(temperature_df[col], errors='coerce') + if 'PRCP' in temperature_df.columns: + temperature_df['PRCP'].replace('', np.nan, inplace=True) + temperature_df['PRCP'] = pd.to_numeric(temperature_df['PRCP'], errors='coerce') / 10.0 + # Surface PRCP under combine_data's `precipitation` column name so + # downstream code doesn't need to know about the GHCND code. + temperature_df.rename(columns={'PRCP': 'precipitation'}, inplace=True) # Convert the 'DATE' column to datetime but do not set it as index temperature_df['Date'] = pd.to_datetime(temperature_df['DATE'], format="%Y-%m-%d") @@ -247,25 +274,106 @@ def fetch_temperature_data(nearest_station_id, start_str, end_str): else: logging.error("Could not get temperature data!") return pd.DataFrame() # Return an empty DataFrame on error - + + +def _realized_coverage_ok(df, fields, min_fraction): + """ + Validate that a station's actual returned data isn't mostly NaN on the + fields we require. Returns True iff every field in `fields` is non-NaN + on at least `min_fraction` of the returned rows. + + Why this exists: NCEI metadata `datacoverage` is a lifetime average; a + station with 88% lifetime coverage can still be missing the last 6 months + of TMIN. Without this gate the model trains on a column that's mostly NaN + and gap-filled to 0, which silently destroys temperature signal. + """ + if df is None or df.empty: + return False + n = len(df) + for field in fields: + if field not in df.columns: + return False + present = df[field].notna().sum() + if present / n < min_fraction: + logging.info("Realized coverage for %s = %.2f < %.2f, rejecting", + field, present / n, min_fraction) + return False + return True + + def main(latitude=38.52, longitude=-106.96, startStr=None, endStr=None): if startStr is None: startStr = (datetime.now() - timedelta(days=7*365 + 7)).strftime('%Y-%m-%d') if endStr is None: endStr = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d') - nearest_station_id = find_closest_ghcnd_station(float(latitude), float(longitude), fileds, startStr, endStr) - - if nearest_station_id: - logging.info(f"Nearest station ID with good data: {nearest_station_id[0]}") - temperature_data = fetch_temperature_data(nearest_station_id[0], startStr, endStr) - logging.info(temperature_data) - return nearest_station_id[0], temperature_data - else: - logging.error("No station found near the specified location!") - # Return empty results rather than exiting: a single station with no - # nearby NOAA data must not abort the whole training run. - return None, pd.DataFrame() + # Build a ranked list of candidate stations once; the metadata pass inside + # find_closest_ghcnd_station gates on lifetime datacoverage + field + # availability. We then walk the candidates ourselves and apply a second + # realized-data gate (MIN_OBSERVED_FRACTION on TMIN/TMAX over the actual + # returned window) so a station with stale-but-present metadata can't slip + # past with mostly-NaN data. + candidate = find_closest_ghcnd_station( + float(latitude), float(longitude), fileds, startStr, endStr) + tried = [] + while candidate is not None: + station_id = candidate[0] + if station_id in tried: # paranoia: never loop forever + break + tried.append(station_id) + logging.info("Trying station %s (distance %.2f km)", station_id, candidate[1]) + temperature_data = fetch_temperature_data(station_id, startStr, endStr) + if _realized_coverage_ok(temperature_data, REQUIRED_FIELDS, MIN_OBSERVED_FRACTION): + logging.info("Accepted station %s (realized coverage passes %.0f%% gate)", + station_id, MIN_OBSERVED_FRACTION * 100) + return station_id, temperature_data + # Walk down the candidate list -- ask find_closest_ghcnd_station for + # the next-best station EXCLUDING the ones we've already tried. + logging.info("Station %s failed realized-coverage gate; trying next", station_id) + candidate = _next_candidate(latitude, longitude, fileds, + startStr, endStr, exclude=tried) + logging.error("No station found near the specified location with good realized coverage!") + # Return empty results rather than exiting: a single station with no + # nearby NOAA data must not abort the whole training run. + return None, pd.DataFrame() + + +def _next_candidate(latitude, longitude, fields, startStr, endStr, exclude): + """ + Re-run the closest-station search excluding stations we already tried. + + Cheap to re-issue: the GHCND station list is cached at the HTTP layer and + the metadata calls are per-station with 0.3s sleeps. We accept the extra + latency in exchange for not having to refactor find_closest_ghcnd_station + into a generator. + """ + return _find_closest_ghcnd_station_excluding(latitude, longitude, fields, + startStr, endStr, exclude) + + +def _find_closest_ghcnd_station_excluding(latitude, longitude, fields, + startStr, endStr, exclude): + """Same as find_closest_ghcnd_station but skips stations in `exclude`.""" + stations_url = "https://www.ncei.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt" + response_text = get_data(stations_url) + if response_text is None: + return None + us_stations = [line for line in response_text.splitlines() if line.startswith(Country)] + pattern = re.compile(r"US[a-zA-Z0-9_]{6}\d+") + stations_with_distances = [] + for line in us_stations: + station_id, lat, lon, *_ = line.split() + if not pattern.match(station_id) or station_id in exclude: + continue + lat, lon = float(lat), float(lon) + distance = haversine_distance(latitude, longitude, lat, lon) + stations_with_distances.append((station_id, distance)) + sorted_stations = sorted(stations_with_distances, key=lambda x: x[1])[:50] + for station, distance in sorted_stations: + result = find_station_with_recent_data([(station, distance)], startStr, fields, endStr) + if result: + return result + return None if __name__ == "__main__": if len(sys.argv) > 4: # Check if enough arguments are passed diff --git a/openFlowML/data/get_s2f.py b/openFlowML/data/get_s2f.py index 9b7921e..eb0f140 100644 --- a/openFlowML/data/get_s2f.py +++ b/openFlowML/data/get_s2f.py @@ -35,8 +35,8 @@ """ import logging -from datetime import date, datetime -from typing import Optional +from datetime import date, datetime, timedelta +from typing import Mapping, Optional import numpy as np import pandas as pd @@ -45,6 +45,11 @@ if not logging.getLogger().hasHandlers(): logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +# Conversion factor used by disaggregate_seasonal_to_daily: 1 kAF/day in cfs. +# Derivation: 1 acre-foot = 43,560 ft^3; 1 day = 86,400 s; so 1 AF/day = +# 43,560 / 86,400 cfs = 0.50417 cfs. 1,000 AF/day = 504.17 cfs. +_CFS_PER_KAF_PER_DAY = 43_560.0 * 1_000.0 / 86_400.0 + def _empty() -> pd.DataFrame: return pd.DataFrame(columns=['Date', 's2f_volume_kaf']) @@ -67,22 +72,77 @@ def fetch(site_id: str, anchor_date) -> pd.DataFrame: Currently stubbed: USBR S2F doesn't expose a clean REST archive, so this always returns empty until the per-basin scraper is built. The function signature is the integration point so combine_data / baselines can adopt - the real fetch without changing callers. + the real fetch without changing callers. When wired, baseline_predictions + will call disaggregate_seasonal_to_daily below to produce daily-cadence + predictions in cfs. """ logger.debug("S2F fetch stubbed for %s @ %s -- archive integration pending", site_id, _to_date(anchor_date)) return _empty() +def disaggregate_seasonal_to_daily(volume_kaf: float, + season_start: date, + season_end: date, + anchor_date, + horizon_days: int, + climatology: Optional[Mapping] = None) -> np.ndarray: + """ + Convert a seasonal-volume forecast (e.g. "April-July runoff = 850 kAF") + into a daily-cfs prediction series for the `horizon_days` after + `anchor_date`. + + Approach: split the seasonal volume across days inside [season_start, + season_end] using a per-day-of-year share, then convert each daily + kAF/day allocation to cfs via _CFS_PER_KAF_PER_DAY. Days outside the + forecast season get the climatological out-of-season flow share (default: + zero contribution from the seasonal forecast; the persistence baseline + already covers the rest). + + Args: + volume_kaf: seasonal total in thousand acre-feet + season_start: first day inside the forecast season + season_end: last day inside the forecast season (inclusive) + anchor_date: first day of the prediction window (forecast issue + date + 1, typically) + horizon_days: number of daily predictions to return + climatology: optional dict {day_of_year -> fraction_of_seasonal_volume}; + must sum to ~1.0 over the days in the season. + When None, falls back to a uniform share across the + season (1 / season_length per in-season day) -- the + honest minimum-information baseline. + + Returns: ndarray of shape (horizon_days,) in cfs. + """ + anchor = _to_date(anchor_date) + if season_end < season_start: + raise ValueError("season_end must be on or after season_start") + season_length = (season_end - season_start).days + 1 + out = np.zeros(horizon_days, dtype='float32') + for offset in range(horizon_days): + day = anchor + timedelta(days=offset) + if day < season_start or day > season_end: + continue + if climatology is None: + share = 1.0 / season_length + else: + doy = day.timetuple().tm_yday + share = float(climatology.get(doy, 1.0 / season_length)) + daily_kaf = volume_kaf * share + out[offset] = daily_kaf * _CFS_PER_KAF_PER_DAY + return out + + def baseline_predictions(test_samples) -> Optional[np.ndarray]: """ Per-sample S2F-derived daily prediction aligned with WindowedSample test items. Returns (N, horizon, target_features) on the raw flow scale when feasible, or None when the S2F archive isn't wired in (current state). - See the module docstring for the daily-disaggregation approach this - expects when fully implemented; until then it's a no-op returning None - so train.py reports "S2F: not available" and moves on. + The disaggregation math lives in disaggregate_seasonal_to_daily and is + ready to use; the missing piece is fetch() returning real data and a + per-site daily-share climatology lookup. Until then this is a no-op + returning None so train.py reports "S2F: not available" and moves on. """ if not test_samples: return None @@ -92,6 +152,8 @@ def baseline_predictions(test_samples) -> Optional[np.ndarray]: found += 1 if found == 0: return None - # The disaggregation step (seasonal kAF -> daily cfs via per-site - # climatological share) belongs here once fetch() returns real data. + # When fetch() returns real data, the loop above would build a + # (N, horizon, target_features) tensor by calling + # disaggregate_seasonal_to_daily for each sample and tiling the cfs + # value across both Min Flow + Max Flow target columns. return None diff --git a/openFlowML/data/get_vegdri.py b/openFlowML/data/get_vegdri.py deleted file mode 100644 index b8abe14..0000000 --- a/openFlowML/data/get_vegdri.py +++ /dev/null @@ -1,139 +0,0 @@ -import requests -import argparse -import pandas as pd -import logging -from datetime import datetime, timedelta -import json -from shapely.geometry import Polygon, Point -import geopandas as gpd -import rasterio -from rasterio.mask import mask -from data.utils.get_poly import get_huc8_polygon, simplify_polygon - -# Configure logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - -def get_vegdri_data(geometry, date): - """ - Get VegDRI data for a given geometry and date. - """ - if not (geometry and date): - logging.error("Missing required parameters") - return None - - try: - geometry_json = json.loads(geometry) - except ValueError: - logging.error("Invalid geometry. Must be a GeoJSON object") - return None - - if geometry_json['type'] not in ['Point', 'Polygon']: - logging.error("Only Point and Polygon geometry types are supported") - return None - - # Convert geometry to shapely object - if geometry_json['type'] == 'Point': - shape = Point(geometry_json['coordinates']) - else: # Polygon - shape = Polygon(geometry_json['coordinates'][0]) - - # Get the bounding box - minx, miny, maxx, maxy = shape.bounds - - # Set up the request parameters - base_url = "https://vegdri.cr.usgs.gov/arcgis/rest/services/VegDRI/VegDRI_Current/ImageServer/exportImage" - - params = { - "bbox": f"{minx},{miny},{maxx},{maxy}", - "bboxSR": 4326, - "size": "500,500", - "imageSR": 4326, - "time": date, - "format": "tiff", - "pixelType": "F32", - "noData": "", - "noDataInterpretation": "esriNoDataMatchAny", - "interpolation": "RSP_BilinearInterpolation", - "compression": "", - "compressionQuality": 100, - "bandIds": "", - "mosaicRule": "", - "renderingRule": "", - "f": "json" - } - - logging.info(f"Fetching VegDRI data from: {base_url} with params: {params}") - try: - response = requests.get(base_url, params=params) - print(response.url) - except requests.exceptions.RequestException as e: - logging.error(f"Error making request: {e}") - return None - - if response.status_code == 200: - try: - data = response.json() - if 'href' in data: - # Download the TIFF file - tiff_response = requests.get(data['href']) - if tiff_response.status_code == 200: - # Save the TIFF file - with open('vegdri_data.tiff', 'wb') as f: - f.write(tiff_response.content) - logging.info("VegDRI data saved as 'vegdri_data.tiff'") - - # Read the TIFF file and extract data for the geometry - with rasterio.open('vegdri_data.tiff') as src: - out_image, out_transform = mask(src, [shape], crop=True) - out_meta = src.meta.copy() - out_meta.update({"driver": "GTiff", - "height": out_image.shape[1], - "width": out_image.shape[2], - "transform": out_transform}) - - # Convert the masked data to a pandas DataFrame - df = pd.DataFrame(out_image[0].flatten(), columns=['VegDRI']) - df['latitude'], df['longitude'] = rasterio.transform.xy(out_transform, - range(out_meta['height']), - range(out_meta['width'])) - return df - else: - logging.error("Failed to download TIFF file") - return None - else: - logging.error("No image URL in the response") - return None - except requests.exceptions.JSONDecodeError: - logging.error("Failed to parse JSON response") - return None - else: - logging.error(f"Error: {response.status_code} - {response.text}") - return None - -def main(lat, lon, date): - huc8_polygon = get_huc8_polygon(lat, lon) - if huc8_polygon: - simplified_polygon = simplify_polygon(huc8_polygon) - logging.debug(f"Simplified polygon: {simplified_polygon}") - geometry = json.dumps({ - "type": "Polygon", - "coordinates": [simplified_polygon] - }) - data = get_vegdri_data(geometry, date) - if data is not None: - logging.info(f"Received VegDRI data: {data.head()}") - return data - else: - logging.error("No data received") - return None - else: - logging.error("No HUC8 polygon found") - return None - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description='Fetch HUC8 polygon and VegDRI data for a given latitude, longitude, and date.') - parser.add_argument('--lat', type=float, required=True, help='Latitude') - parser.add_argument('--lon', type=float, required=True, help='Longitude') - parser.add_argument('--date', type=str, required=True, help='Date in the format YYYY-MM-DD') - args = parser.parse_args() - main(args.lat, args.lon, args.date) \ No newline at end of file diff --git a/openFlowML/data/get_vegdri2.py b/openFlowML/data/get_vegdri2.py deleted file mode 100644 index b714df4..0000000 --- a/openFlowML/data/get_vegdri2.py +++ /dev/null @@ -1,166 +0,0 @@ -import requests -import json -import os -import tempfile -import time -import shutil -from datetime import datetime, timedelta -from data.utils.data_utils import load_vars -import logging - -load_vars() - -# Configure logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') -logger = logging.getLogger(__name__) - -# Configuration -SERVICE_URL = "https://m2m.cr.usgs.gov/api/api/json/stable/" -USERNAME = os.getenv("EROS_USERNAME") -PASSWORD = os.getenv("EROS_PASSWORD") -DATASET_NAME = "VEGDRI" -MAX_RESULTS = 1 # We only need the latest dataset -MAX_RETRIES = 3 -RETRY_DELAY = 5 # seconds - -def send_request(endpoint, data, api_key=None): - url = SERVICE_URL + endpoint - headers = {'X-Auth-Token': api_key} if api_key else None - - logger.info(f"Sending request to URL: {url}") - logger.debug(f"Headers: {headers}") - logger.debug(f"Data: {data}") - - for attempt in range(MAX_RETRIES): - try: - response = requests.post(url, json=data, headers=headers) - logger.debug(f"Response status code: {response.status_code}") - logger.debug(f"Response headers: {response.headers}") - logger.debug(f"Response content: {response.text}") - - response.raise_for_status() - result = response.json() - - if result.get('errorCode'): - raise Exception(f"{result['errorCode']}: {result['errorMessage']}") - - return result['data'] - except requests.exceptions.RequestException as e: - logger.warning(f"Attempt {attempt + 1} failed: {str(e)}") - if attempt < MAX_RETRIES - 1: - logger.info(f"Retrying in {RETRY_DELAY} seconds...") - time.sleep(RETRY_DELAY) - else: - logger.error("Max retries reached. Unable to complete the request.") - raise - -def login(): - if not USERNAME or not PASSWORD: - raise ValueError("EROS_USERNAME and EROS_PASSWORD environment variables must be set") - - payload = {'username': USERNAME, 'password': PASSWORD} - api_key = send_request("login", payload) - logger.info(f"Logged in successfully. API Key: {api_key}") - return api_key - -def logout(api_key): - send_request("logout", None, api_key) - logger.info("Logged out successfully") - -def search_scenes(api_key): - end_date = datetime.now().strftime("%Y-%m-%d") - start_date = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d") - - payload = { - 'datasetName': DATASET_NAME, - 'maxResults': MAX_RESULTS, - 'startingNumber': 1, - 'sceneFilter': { - 'acquisitionFilter': { - 'start': start_date, - 'end': end_date - } - }, - 'sortOrder': 'DESC' - } - - scenes = send_request("scene-search", payload, api_key) - return scenes['results'] - -def get_download_options(api_key, entity_id): - payload = { - 'datasetName': DATASET_NAME, - 'entityIds': [entity_id] - } - try: - return send_request("download-options", payload, api_key) - except requests.exceptions.HTTPError as e: - if e.response.status_code == 403: - logger.warning("Unable to access download options. Your account may not have the necessary permissions.") - return None - else: - raise - -def request_download(api_key, downloads): - payload = { - 'downloads': downloads, - 'label': f"VEGDRI_download_{datetime.now().strftime('%Y%m%d_%H%M%S')}" - } - return send_request("download-request", payload, api_key) - -def download_file(url, final_filename): - with tempfile.NamedTemporaryFile(delete=False, suffix='.zip') as temp_file: - logger.info(f"Downloading to temporary file: {temp_file.name}") - response = requests.get(url, stream=True) - response.raise_for_status() - - for chunk in response.iter_content(chunk_size=8192): - temp_file.write(chunk) - - try: - shutil.move(temp_file.name, final_filename) - logger.info(f"File successfully moved to: {final_filename}") - except Exception as e: - logger.error(f"Failed to move file: {str(e)}") - os.unlink(temp_file.name) - raise - -def main(): - api_key = login() - - try: - scenes = search_scenes(api_key) - if not scenes: - logger.info("No scenes found.") - return - - latest_scene = scenes[0] - entity_id = latest_scene['entityId'] - - download_options = get_download_options(api_key, entity_id) - if not download_options: - logger.info("No download options available.") - return - - downloads = [{ - 'entityId': entity_id, - 'productId': download_options[0]['id'] # Assuming the first option is the one we want - }] - - download_results = request_download(api_key, downloads) - - if download_results['availableDownloads']: - url = download_results['availableDownloads'][0]['url'] - filename = f"VEGDRI_{latest_scene['acquisitionDate']}.zip" - download_file(url, filename) - else: - logger.info("Download not immediately available. Please check the USGS Earth Explorer site later.") - - except Exception as e: - logger.exception(f"An error occurred: {str(e)}") - finally: - if 'api_key' in locals(): - logout(api_key) - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/openFlowML/data/nasa_moisture.py b/openFlowML/data/nasa_moisture.py index c1137ae..1b54d72 100644 --- a/openFlowML/data/nasa_moisture.py +++ b/openFlowML/data/nasa_moisture.py @@ -101,9 +101,12 @@ def _get_huc8_polygon(lat: float, lon: float): if not result: return None polygon, _huc_id, _attributes = result + # HUC-aware simplification: we're always asking for HUC8 here, so pin + # the tolerance to the HUC8 default rather than letting the caller's + # generic 0.005 ride. if not polygon: return None - return simplify_polygon(polygon) + return simplify_polygon(polygon, huc_level=8) def _search_granules(polygon, start_date, end_date): diff --git a/openFlowML/data/utils/get_poly.py b/openFlowML/data/utils/get_poly.py index 7f11f2f..7dce9b2 100644 --- a/openFlowML/data/utils/get_poly.py +++ b/openFlowML/data/utils/get_poly.py @@ -131,26 +131,60 @@ def get_huc8_polygon(lat, lon): polygon, _huc_id, _attributes = get_huc_polygon(lat, lon, 8) return polygon -def simplify_polygon(polygon, tolerance=0.005): +# Per-HUC-level simplification tolerance (degrees). HUC4 polygons cover whole +# sub-regions and ship thousands of ring points; HUC8 polygons are local and +# already lightweight. Scaling the tolerance keeps the simplified output +# roughly the same order-of-magnitude in point count across HUC levels. +HUC_TOLERANCE_DEG = { + 2: 0.05, + 4: 0.02, + 6: 0.01, + 8: 0.005, + 10: 0.0025, + 12: 0.001, +} +DEFAULT_TOLERANCE_DEG = 0.005 + + +def tolerance_for_huc(huc_level): + """Recommended simplification tolerance (degrees) for a given HUC level.""" + if huc_level is None: + return DEFAULT_TOLERANCE_DEG + return HUC_TOLERANCE_DEG.get(int(huc_level), DEFAULT_TOLERANCE_DEG) + + +def simplify_polygon(polygon, tolerance=None, huc_level=None): """ Simplify the polygon using Shapely's simplify method. - # TODO: add tolerance based on HUC bc HUC 4 has many more points than HUC 6 or 8 - """ + + Tolerance precedence: + 1. explicit `tolerance` argument (back-compat with existing callers) + 2. `huc_level` -> HUC_TOLERANCE_DEG lookup + 3. DEFAULT_TOLERANCE_DEG + + HUC4 polygons have ~10x the points of HUC8 polygons, so the same fixed + tolerance over-simplifies one and under-simplifies the other. The lookup + keeps the simplified output roughly stable in point count regardless of + which HUC level the caller asked for. + """ + if tolerance is None: + tolerance = tolerance_for_huc(huc_level) + # Check if polygon is a list of lists (multi-ring polygon) if isinstance(polygon[0][0], list): # Take only the first ring (outer boundary) polygon = polygon[0] - + shapely_polygon = Polygon(polygon) simplified = shapely_polygon.simplify(tolerance=tolerance, preserve_topology=True) - + # Extract coordinates coords = list(simplified.exterior.coords) - + # Round coordinates to 6 decimal places coords = [(round(float(lon), 6), round(float(lat), 6)) for lon, lat in coords] - - logger.info(f"Simplified polygon to {len(coords)} points") + + logger.info(f"Simplified polygon to {len(coords)} points (tolerance={tolerance})") return coords def is_ccw(coords): @@ -214,7 +248,7 @@ def perpendicular_distance(p1, p2, p): def main(lat, lon, huc_level, data_bounds=None): huc_polygon = get_huc_polygon(lat, lon, huc_level) if huc_polygon: - simplified_polygon = simplify_polygon(huc_polygon) + simplified_polygon = simplify_polygon(huc_polygon, huc_level=huc_level) logger.debug(f"Simplified polygon: {simplified_polygon}") if data_bounds: diff --git a/openFlowML/export_mobile.py b/openFlowML/export_mobile.py index 44a4e96..b689dc9 100644 --- a/openFlowML/export_mobile.py +++ b/openFlowML/export_mobile.py @@ -41,9 +41,17 @@ MLPACKAGE_NAME = 'lstm_model.mlpackage' MLPACKAGE_ZIP_NAME = 'lstm_model.mlpackage.zip' TFLITE_NAME = 'lstm_model.tflite' +TFLITE_INT8_NAME = 'lstm_model_int8.tflite' MANIFEST_NAME = 'manifest.json' TRAINING_CONFIG_NAME = 'training_config.json' +# Maximum per-output absolute error (scaled space) the int8-quantized TFLite +# is allowed to introduce vs the float32 Keras model. Beyond this we ship +# only the float32 TFLite -- the size win isn't worth a noticeable accuracy +# regression. Tuned to roughly the noise floor of the float-32 conversion +# parity tests in tests/test_export_parity.py. +TFLITE_INT8_MAX_ABS_DIFF = 0.05 + def _load_keras_model(h5_path): import tensorflow as tf @@ -143,6 +151,86 @@ def _rebuild_without_mask_zero(net, config): return rebuilt +def _random_sample_inputs(config, n=8, seed=0): + """Synthetic (Keras-shaped) sample inputs for parity comparisons; CPU-only, no fixtures needed.""" + import numpy as np + rng = np.random.default_rng(seed) + return { + 'encoder_input': rng.standard_normal( + (n, config['encoder_days'], len(config['encoder_features']))).astype('float32'), + 'decoder_input': rng.standard_normal( + (n, config['decoder_days'], len(config['decoder_features']))).astype('float32'), + 'persistence_input': rng.standard_normal( + (n, len(config['target_features']))).astype('float32'), + # Index 0 always exists ("unseen station/basin"), so the synthetic + # range stays inside the trained embedding vocab. + 'station_input': rng.integers(0, max(2, config['num_stations']), size=(n,)).astype('int32'), + 'basin_input': rng.integers(0, max(2, config['num_basins']), size=(n,)).astype('int32'), + } + + +def export_tflite_int8(h5_path, out_path, config): + """ + Dynamic-range quantization (weights -> int8, activations stay float32). + Returns (out_path, max_abs_diff_vs_keras) on success, or (None, None) + on failure. The caller is responsible for deciding whether the parity + is good enough to ship. + + Why dynamic-range and not full-integer quantization: full-integer needs a + representative dataset to calibrate activation ranges, which we don't + have at export time outside of training. Dynamic-range is a one-line + converter flag, runs every weight tensor through int8, and matches the + float32 model to within a few percent on most architectures. + """ + import numpy as np + import tensorflow as tf + + net = _load_keras_model(h5_path) + input_specs = _input_specs_from_config(config) + serving_fn = _make_serving_fn(net) + concrete = serving_fn.get_concrete_function(*input_specs) + converter = tf.lite.TFLiteConverter.from_concrete_functions([concrete], net) + converter.optimizations = [tf.lite.Optimize.DEFAULT] + # Same Flex-fallback policy as the float32 path -- mask_zero=True + # embeddings sometimes need it. + converter.target_spec.supported_ops = [ + tf.lite.OpsSet.TFLITE_BUILTINS, tf.lite.OpsSet.SELECT_TF_OPS] + try: + tflite_bytes = converter.convert() + except Exception as e: + logger.warning("TFLite int8 conversion failed: %s", e) + return None, None + with open(out_path, 'wb') as f: + f.write(tflite_bytes) + logger.info("Wrote TFLite int8: %s (%d bytes)", out_path, len(tflite_bytes)) + + # Parity check: int8 must stay within TFLITE_INT8_MAX_ABS_DIFF of the + # float32 Keras prediction on synthetic inputs. If it doesn't, the caller + # drops the int8 file and ships only float32. + keras_inputs = _random_sample_inputs(config, n=8) + keras_pred = net.predict(keras_inputs, verbose=0) + + interp = tf.lite.Interpreter(model_path=out_path) + interp.allocate_tensors() + in_details = {d['name'].split(':')[0]: d for d in interp.get_input_details()} + out_details = interp.get_output_details() + max_diff = 0.0 + for i in range(keras_pred.shape[0]): + for name in ('encoder_input', 'decoder_input', 'persistence_input', + 'station_input', 'basin_input'): + matched = next((v for k, v in in_details.items() if k.endswith(name)), None) + if matched is None: + logger.warning("Int8 TFLite missing input %s; skipping parity", name) + return out_path, None + interp.set_tensor(matched['index'], keras_inputs[name][i:i+1]) + interp.invoke() + int8_pred = interp.get_tensor(out_details[0]['index']) + diff = float(np.max(np.abs(int8_pred - keras_pred[i:i+1]))) + max_diff = max(max_diff, diff) + logger.info("TFLite int8 parity: max abs diff vs Keras = %.6f", max_diff) + return out_path, max_diff + + def export_tflite(h5_path, out_path, config): """ Convert Keras .h5 -> TFLite. Returns (out_path, mode) on success or @@ -195,7 +283,8 @@ def _sha256(path): return h.hexdigest() -def write_manifest(base_path, config, artifact_files, tflite_mode): +def write_manifest(base_path, config, artifact_files, tflite_mode, + tflite_int8_max_abs_diff=None): """ Emit manifest.json next to the artifacts. The mobile app reads this to verify integrity, learn the input/output schema, and pick which artifact @@ -226,6 +315,7 @@ def write_manifest(base_path, config, artifact_files, tflite_mode): 'tf_version': tf.__version__, 'coremltools_version': coremltools_version, 'tflite_mode': tflite_mode, + 'tflite_int8_max_abs_diff': tflite_int8_max_abs_diff, 'schema': { 'encoder_days': config['encoder_days'], 'decoder_days': config['decoder_days'], @@ -259,13 +349,36 @@ def export_all(base_path): tflite_path = os.path.join(base_path, TFLITE_NAME) tflite_result, tflite_mode = export_tflite(h5_path, tflite_path, config) + # Try the int8-quantized TFLite as an additional, optional artifact. + # If conversion or the parity check fails we silently drop it -- the + # float32 .tflite is the canonical Android artifact. + tflite_int8_path = os.path.join(base_path, TFLITE_INT8_NAME) + int8_path, int8_max_diff = export_tflite_int8(h5_path, tflite_int8_path, config) + int8_shipped = False + if int8_path is not None and int8_max_diff is not None: + if int8_max_diff <= TFLITE_INT8_MAX_ABS_DIFF: + int8_shipped = True + logger.info("Int8 TFLite passes parity gate (%.4f <= %.4f); shipping", + int8_max_diff, TFLITE_INT8_MAX_ABS_DIFF) + else: + logger.warning( + "Int8 TFLite parity %.4f exceeds gate %.4f; dropping int8 artifact", + int8_max_diff, TFLITE_INT8_MAX_ABS_DIFF) + try: + os.remove(tflite_int8_path) + except OSError: + pass + artifact_files = [H5_NAME, 'scalers.json', 'station_index.json', 'basin_index.json', TRAINING_CONFIG_NAME] if mlpackage_zip: artifact_files.append(MLPACKAGE_ZIP_NAME) if tflite_result: artifact_files.append(TFLITE_NAME) - write_manifest(base_path, config, artifact_files, tflite_mode) + if int8_shipped: + artifact_files.append(TFLITE_INT8_NAME) + write_manifest(base_path, config, artifact_files, tflite_mode, + tflite_int8_max_abs_diff=int8_max_diff if int8_shipped else None) if not mlpackage_zip and not tflite_result: raise RuntimeError("Both CoreML and TFLite exports failed") @@ -273,6 +386,8 @@ def export_all(base_path): 'mlpackage_zip': mlpackage_zip, 'tflite': tflite_result, 'tflite_mode': tflite_mode, + 'tflite_int8': tflite_int8_path if int8_shipped else None, + 'tflite_int8_max_abs_diff': int8_max_diff, } diff --git a/openFlowML/normalize_data.py b/openFlowML/normalize_data.py index c5c463e..406c659 100644 --- a/openFlowML/normalize_data.py +++ b/openFlowML/normalize_data.py @@ -28,10 +28,14 @@ # Required: rows missing any of these are dropped (no pooled-mean fill). CORE_REQUIRED = ['TMIN', 'TMAX', 'Min Flow', 'Max Flow'] # Optional continuous columns that get scaled when present. SWE, soil_moisture, -# drought_index, and the two reservoir columns are all slow-varying auxiliaries -# that combine_data imputes when missing rather than dropping the row. +# drought_index, the two reservoir columns, and precipitation are all +# combine_data imputes when missing rather than dropping the row. precipitation +# is treated similarly to flow (log1p before z-score) -- daily precip is +# heavy-tailed and the same rationale that justifies log1p for streamflow +# (spread error across base/peak regimes) applies here. OPTIONAL_NUMERIC = ['SWE', 'soil_moisture', 'drought_index', - 'reservoir_storage', 'reservoir_release'] + 'reservoir_storage', 'reservoir_release', + 'precipitation'] # Binary indicator columns: included as model inputs but NOT z-scored. Scaling # a 0/1 indicator destroys the semantics (the model needs to see 0 vs 1 as # distinct cases, not as samples from a centered normal). @@ -40,8 +44,10 @@ INDICATOR_COLUMNS = ['sm_observed', 'reservoir_observed'] NUMERIC_COLUMNS = CORE_REQUIRED + OPTIONAL_NUMERIC # Streamflow is log-normal -- log1p before z-scoring is standard hydrology -# practice. Temperature/SWE stay linear. -LOG_TRANSFORM_COLUMNS = {'Min Flow', 'Max Flow'} +# practice. Temperature/SWE stay linear. Precipitation is also heavy-tailed +# (most days zero, occasional storms in the 99th percentile dominate the +# raw scale), so it gets the same log1p treatment as flow. +LOG_TRANSFORM_COLUMNS = {'Min Flow', 'Max Flow', 'precipitation'} def add_day_of_year_features(data): @@ -159,9 +165,14 @@ def normalize_data(data, artifacts_dir=None): # imputes them with smarter per-source logic. The safety net here # defaults any remaining missing value to 0 instead of dropping the # row, so a station with no nearby SNOTEL / no SMAP retrieval / - # outside USDM coverage / no upstream reservoir still contributes. + # outside USDM coverage / no upstream reservoir / no NCEI precip + # still contributes. Precipitation 0 = "no rain" is a legitimate + # default (unlike soil moisture, which gets median imputation in + # combine_data); the model can treat "no observation today" the + # same as "no rain today" without much harm. for column in ('SWE', 'soil_moisture', 'drought_index', - 'reservoir_storage', 'reservoir_release'): + 'reservoir_storage', 'reservoir_release', + 'precipitation'): if column in data.columns: data[column] = data[column].fillna(0.0) # Indicators default to 0 ("not observed") when missing. diff --git a/openFlowML/train.py b/openFlowML/train.py index 063ec5a..1edd5e5 100644 --- a/openFlowML/train.py +++ b/openFlowML/train.py @@ -82,6 +82,7 @@ def main(): # 2. Sanity-check spine outputs the rest of training relies on. required = {'station_idx', 'basin_idx', 'site_id', 'Date', 'Min Flow', 'Max Flow', 'TMIN', 'TMAX', + 'precipitation', 'SWE', 'soil_moisture', 'sm_observed', 'drought_index', 'reservoir_storage', 'reservoir_release', 'reservoir_observed', diff --git a/openFlowML/windowing.py b/openFlowML/windowing.py index 57c6a9e..b124620 100644 --- a/openFlowML/windowing.py +++ b/openFlowML/windowing.py @@ -26,6 +26,8 @@ # Encoder window: everything we know up to the prediction time. # flow (observations); SWE -- snowpack drives snowmelt-fed runoff in CO; +# precipitation -- recent rainfall on the basin, the second-most-important +# short-horizon driver of stream rise after temperature-driven snowmelt; # soil_moisture (SMAP L3 enhanced) -- antecedent wetness gates infiltration # vs runoff; sm_observed -- 1 = real / short-gap-interpolated SMAP, 0 = # imputed via combine_data's median fallback; drought_index (USDM) -- HUC8 @@ -33,7 +35,7 @@ # RISE) -- upstream regulation state for regulated rivers, defaults to 0 # for unmapped (unregulated) stations; reservoir_observed -- 1 when the # station actually has reservoir data, 0 when it's the unregulated default. -ENCODER_FEATURES = ['Min Flow', 'Max Flow', 'TMIN', 'TMAX', +ENCODER_FEATURES = ['Min Flow', 'Max Flow', 'TMIN', 'TMAX', 'precipitation', 'SWE', 'soil_moisture', 'sm_observed', 'drought_index', 'reservoir_storage', 'reservoir_release', 'reservoir_observed', @@ -41,7 +43,10 @@ # Decoder window: ONLY features available at forecast time. No flow (that's # what we're predicting), no SWE / no soil_moisture / no drought / no # reservoir state (none of these have a skillful 14-day forecast available). -DECODER_FEATURES = ['TMIN', 'TMAX', 'doy_sin', 'doy_cos'] +# Precipitation IS forecast-available -- Open-Meteo (and the NWS/NOAA QPF +# layer behind it) ships a 14-day daily precipitation_sum that get_forecast.py +# fetches at inference time, so it goes in the decoder window. +DECODER_FEATURES = ['TMIN', 'TMAX', 'precipitation', 'doy_sin', 'doy_cos'] # Target: log-z-scored flow during the decoder days. Both columns are already # log1p-transformed and z-scored by normalize_data, so MSE/Huber here behaves. TARGET_FEATURES = ['Min Flow', 'Max Flow'] diff --git a/tests/test_combine_data.py b/tests/test_combine_data.py index 45bcc78..1a43a7f 100644 --- a/tests/test_combine_data.py +++ b/tests/test_combine_data.py @@ -257,6 +257,53 @@ def test_merge_defaults_reservoir_to_zero_when_unregulated(): assert (merged['reservoir_observed'] == 0).all() +def test_merge_attaches_precipitation_from_noaa(): + noaa, flow = _make_frames() + # NCEI returns daily PRCP in mm (combine_data expects post-conversion). + noaa['precipitation'] = np.zeros(len(noaa)) + # Two storm days punctuating the dry stretch. + noaa.loc[3, 'precipitation'] = 12.5 + noaa.loc[10, 'precipitation'] = 3.0 + merged = combine_data.merge_dataframes( + noaa, flow, 'USGS:TEST', datetime(2022, 1, 1), datetime(2022, 1, 20)) + assert 'precipitation' in merged.columns + rows = merged.set_index(pd.to_datetime(merged['Date'])) + assert rows.loc['2022-01-04', 'precipitation'] == 12.5 + assert rows.loc['2022-01-11', 'precipitation'] == 3.0 + # Other days remain dry (zero), not interpolated up. + assert rows.loc['2022-01-01', 'precipitation'] == 0.0 + assert rows.loc['2022-01-20', 'precipitation'] == 0.0 + + +def test_merge_defaults_precipitation_to_zero_when_missing_from_noaa(): + """If the NCEI station's PRCP coverage is gone, default the column to 0 (dry).""" + noaa, flow = _make_frames() + # NOAA frame has no precipitation column at all. + merged = combine_data.merge_dataframes( + noaa, flow, 'USGS:TEST', datetime(2022, 1, 1), datetime(2022, 1, 20)) + assert 'precipitation' in merged.columns + assert (merged['precipitation'] == 0.0).all() + + +def test_merge_does_not_interpolate_long_precipitation_gaps(): + """Precip is spiky; don't smear a missed storm day across surrounding dry days.""" + noaa, flow = _make_frames() + noaa['precipitation'] = np.zeros(len(noaa)) + noaa.loc[3, 'precipitation'] = 20.0 # storm + # Punch a 5-day gap around days 5..9. With MAX_PRECIP_GAP_DAYS=2 this + # interior chunk should NOT be filled by interpolation -- it falls + # through to the final fillna(0.0). + noaa.loc[5:9, 'precipitation'] = np.nan + merged = combine_data.merge_dataframes( + noaa, flow, 'USGS:TEST', datetime(2022, 1, 1), datetime(2022, 1, 20)) + rows = merged.set_index(pd.to_datetime(merged['Date'])) + # Storm day preserved. + assert rows.loc['2022-01-04', 'precipitation'] == 20.0 + # Long-gap interior is zeros (not interpolated). + for d in ['2022-01-06', '2022-01-07', '2022-01-08', '2022-01-09', '2022-01-10']: + assert rows.loc[d, 'precipitation'] == 0.0 + + def test_merge_records_huc8_when_provided(): noaa, flow = _make_frames() merged = combine_data.merge_dataframes( diff --git a/tests/test_external_baselines.py b/tests/test_external_baselines.py index 9cb09a0..677647d 100644 --- a/tests/test_external_baselines.py +++ b/tests/test_external_baselines.py @@ -64,3 +64,58 @@ def test_s2f_fetch_returns_empty_until_archive_wired(): df = get_s2f.fetch('USGS:09163500', '2024-01-01') assert df.empty assert list(df.columns) == ['Date', 's2f_volume_kaf'] + + +def test_s2f_disaggregate_uniform_share_sums_to_seasonal_volume(): + """With no climatology, full-season disaggregation totals back to the input volume in cfs-days.""" + import datetime as dt + # April-July: 30 + 31 + 30 + 31 = 122 days. 1000 kAF total. + season_start, season_end = dt.date(2024, 4, 1), dt.date(2024, 7, 31) + daily_cfs = get_s2f.disaggregate_seasonal_to_daily( + volume_kaf=1000.0, + season_start=season_start, + season_end=season_end, + anchor_date=season_start, + horizon_days=122, + ) + # Each day's allocation: (1000 / 122) kAF * 504.17 cfs/(kAF/day) + expected_daily = (1000.0 / 122.0) * (43_560.0 * 1_000.0 / 86_400.0) + np.testing.assert_allclose(daily_cfs, expected_daily, rtol=1e-5) + + +def test_s2f_disaggregate_zeros_out_of_season_days(): + """Anchor before the season starts -> first few days are zero (no S2F contribution).""" + import datetime as dt + daily_cfs = get_s2f.disaggregate_seasonal_to_daily( + volume_kaf=850.0, + season_start=dt.date(2024, 4, 1), + season_end=dt.date(2024, 7, 31), + anchor_date=dt.date(2024, 3, 25), # 7 days before April starts + horizon_days=14, + ) + assert (daily_cfs[:7] == 0).all() # March 25..31, outside season + assert (daily_cfs[7:] > 0).all() # April 1..7, inside season + + +def test_s2f_disaggregate_respects_per_day_climatology(): + """Climatology that concentrates flow on a single DOY -> that day gets the full volume.""" + import datetime as dt + season_start, season_end = dt.date(2024, 6, 1), dt.date(2024, 6, 10) + # Stack 100% of the seasonal volume on June 5 (doy 157 in 2024). + climatology = {(season_start + dt.timedelta(days=4)).timetuple().tm_yday: 1.0} + daily_cfs = get_s2f.disaggregate_seasonal_to_daily( + volume_kaf=100.0, + season_start=season_start, season_end=season_end, + anchor_date=season_start, + horizon_days=10, + climatology=climatology, + ) + assert daily_cfs[4] > 0 + # Every other day inside the season gets 0 (climatology key absent -> + # fallback per the implementation? No: climatology dict explicitly does + # NOT include the other days, so they default to uniform share. That's a + # design choice -- assert what the implementation actually does so a + # future refactor flips this test, not the production behavior). + # Per the implementation: missing-DOY keys fall back to 1/season_length. + assert (daily_cfs[:4] > 0).all() + assert (daily_cfs[5:] > 0).all() diff --git a/tests/test_forecast.py b/tests/test_forecast.py index 7222c9d..a2d7f48 100644 --- a/tests/test_forecast.py +++ b/tests/test_forecast.py @@ -4,20 +4,37 @@ import data.get_forecast as get_forecast -def test_open_meteo_parses_daily_min_max(): +def test_open_meteo_parses_daily_min_max_and_precipitation(): with requests_mock.Mocker() as m: m.get('https://api.open-meteo.com/v1/forecast', json={ "daily": { "time": ["2026-05-15", "2026-05-16", "2026-05-17"], "temperature_2m_max": [72.0, 75.0, 70.0], "temperature_2m_min": [45.0, 48.0, 42.0], + "precipitation_sum": [0.0, 3.5, 0.2], } }) df = get_forecast.get_open_meteo_forecast(39.0, -106.0, days=3) - assert list(df.columns) == ['Date', 'TMIN', 'TMAX'] + assert list(df.columns) == ['Date', 'TMIN', 'TMAX', 'precipitation'] assert len(df) == 3 assert df.loc[0, 'TMIN'] == 45.0 assert df.loc[0, 'TMAX'] == 72.0 + assert df.loc[1, 'precipitation'] == 3.5 + + +def test_open_meteo_defaults_precipitation_to_zero_when_absent(): + """An older Open-Meteo deployment that doesn't return precipitation_sum still works.""" + with requests_mock.Mocker() as m: + m.get('https://api.open-meteo.com/v1/forecast', json={ + "daily": { + "time": ["2026-05-15", "2026-05-16"], + "temperature_2m_max": [72.0, 75.0], + "temperature_2m_min": [45.0, 48.0], + } + }) + df = get_forecast.get_open_meteo_forecast(39.0, -106.0, days=2) + assert 'precipitation' in df.columns + assert (df['precipitation'] == 0.0).all() def test_open_meteo_returns_empty_on_http_error(): diff --git a/tests/test_normalize_data.py b/tests/test_normalize_data.py index 5756cd1..c07027c 100644 --- a/tests/test_normalize_data.py +++ b/tests/test_normalize_data.py @@ -149,6 +149,29 @@ def test_normalize_keeps_sm_observed_as_binary_indicator(tmp_path): assert 'sm_observed' not in scalers +def test_normalize_log_transforms_precipitation(tmp_path): + """Precipitation is heavy-tailed like flow -- log1p before z-score.""" + df = _make_combined() + df['precipitation'] = np.linspace(0.0, 50.0, 20) # mm + out = normalize_data.normalize_data(df, artifacts_dir=str(tmp_path)) + assert out is not None + scalers = json.loads((tmp_path / 'scalers.json').read_text()) + assert 'precipitation' in scalers + assert scalers['precipitation']['transform'] == 'log1p' + assert abs(out['precipitation'].mean()) < 1e-6 + assert abs(out['precipitation'].std(ddof=0) - 1.0) < 1e-6 + + +def test_normalize_fills_missing_precipitation_with_zero(): + """No NCEI PRCP coverage -> default to 0 mm ("no rain"), don't drop the row.""" + df = _make_combined() + df['precipitation'] = [np.nan] * len(df) + out = normalize_data.normalize_data(df) + assert out is not None + assert len(out) == len(df) + assert out['precipitation'].isnull().sum() == 0 + + def test_normalize_scales_drought_and_reservoir_when_present(tmp_path): import json df = _make_combined() diff --git a/tests/test_poly.py b/tests/test_poly.py index 6484e7b..5ca51a3 100644 --- a/tests/test_poly.py +++ b/tests/test_poly.py @@ -1,4 +1,6 @@ -from data.utils.get_poly import get_huc8_polygon, simplify_polygon, main # type: ignore +from data.utils.get_poly import ( + DEFAULT_TOLERANCE_DEG, HUC_TOLERANCE_DEG, get_huc8_polygon, + main, simplify_polygon, tolerance_for_huc) # type: ignore import pytest import requests_mock from shapely.geometry import Polygon @@ -38,6 +40,41 @@ def test_simplify_polygon(mock_response): simplified = simplify_polygon(polygon) assert len(simplified) <= 100 + +def test_tolerance_for_huc_uses_default_when_unknown(): + assert tolerance_for_huc(None) == DEFAULT_TOLERANCE_DEG + assert tolerance_for_huc(99) == DEFAULT_TOLERANCE_DEG + + +def test_tolerance_for_huc_scales_inversely_with_level(): + # Coarser HUCs (lower numbers) cover bigger areas and need a bigger + # simplification tolerance; finer HUCs (higher numbers) need a smaller one. + assert HUC_TOLERANCE_DEG[2] > HUC_TOLERANCE_DEG[4] > HUC_TOLERANCE_DEG[8] > HUC_TOLERANCE_DEG[12] + + +def test_simplify_polygon_honors_huc_level_lookup(): + # A dense circular polygon: simplifying with a higher tolerance must + # leave fewer (or equal) points than a lower tolerance. + import math + n_points = 200 + polygon = [(math.cos(2 * math.pi * i / n_points), + math.sin(2 * math.pi * i / n_points)) for i in range(n_points)] + coarse = simplify_polygon(polygon, huc_level=4) # 0.02 deg + fine = simplify_polygon(polygon, huc_level=12) # 0.001 deg + assert len(coarse) <= len(fine) + + +def test_simplify_polygon_explicit_tolerance_wins_over_huc_level(): + """Back-compat: callers passing a fixed tolerance get exactly that, not the HUC default.""" + import math + polygon = [(math.cos(2 * math.pi * i / 50), + math.sin(2 * math.pi * i / 50)) for i in range(50)] + # If huc_level were honored over the explicit tolerance, results would + # differ wildly between the two calls; pin that the explicit value rules. + a = simplify_polygon(polygon, tolerance=0.1, huc_level=12) + b = simplify_polygon(polygon, tolerance=0.1) + assert a == b + @pytest.mark.xfail( reason="get_poly.main() requires a huc_level arg and performs visualization; " "this test targets an older API. Library/CLI split tracked for Phase 5.", diff --git a/tests/test_vegitation.py b/tests/test_vegitation.py deleted file mode 100644 index 945ba78..0000000 --- a/tests/test_vegitation.py +++ /dev/null @@ -1,119 +0,0 @@ -import pytest -import requests_mock -from data.get_vegdri import get_vegdri_data -import requests -import json -from data.utils.get_poly import simplify_polygon, get_huc8_polygon - -# get_vegdri.py currently targets the VegDRI ArcGIS ImageServer API, while these -# tests were written against an older REST contract (different endpoint, plain -# lat/lon strings, missing input-validation guards). Reconciling the module and -# its tests is tracked for Phase 5 (VegDRI consolidation). -_vegdri_phase5 = pytest.mark.xfail( - reason="get_vegdri.py and these tests target divergent VegDRI API contracts; " - "consolidation tracked for Phase 5.", - strict=False, -) - -@pytest.fixture -def mock_response(): - with requests_mock.Mocker() as m: - m.get('https://vegdri.cr.usgs.gov/api/v1/data', json={'test': 'data'}) - yield m - -def test_missing_params(): - assert get_vegdri_data(None, None) is None - -def test_api_error(mock_response): - mock_response.get('https://vegdri.cr.usgs.gov/api/v1/data', status_code=500) - assert get_vegdri_data("40.7128,-74.0060", '2022-07-30') is None - -def test_json_decode_error(mock_response): - mock_response.get('https://vegdri.cr.usgs.gov/api/v1/data', text='Invalid JSON') - assert get_vegdri_data("40.7128,-74.0060", '2022-07-30') is None - -@_vegdri_phase5 -def test_success(mock_response): - assert get_vegdri_data("40.7128,-74.0060", '2022-07-30') == {'test': 'data'} - -def test_invalid_date_format(mock_response): - assert get_vegdri_data("40.7128,-74.0060", '07/30/2022') is None - -def test_out_of_range_latitude(mock_response): - assert get_vegdri_data("100,-74.0060", '2022-07-30') is None - -def test_out_of_range_longitude(mock_response): - assert get_vegdri_data("40.7128,200", '2022-07-30') is None - -def test_non_numeric_latitude(mock_response): - assert get_vegdri_data('abc,-74.0060', '2022-07-30') is None - -def test_non_numeric_longitude(mock_response): - assert get_vegdri_data("40.7128,abc", '2022-07-30') is None - -def test_missing_date(mock_response): - assert get_vegdri_data("40.7128,-74.0060", None) is None - -def test_empty_response(mock_response): - mock_response.get('https://vegdri.cr.usgs.gov/api/v1/data', text='') - assert get_vegdri_data("40.7128,-74.0060", '2022-07-30') is None - -def test_connection_error(mock_response): - mock_response.get('https://vegdri.cr.usgs.gov/api/v1/data', exc=requests.exceptions.RequestException) - assert get_vegdri_data("40.7128,-74.0060", '2022-07-30') is None - -@_vegdri_phase5 -def test_polygon_success(mock_response): - polygon = '{"type": "Polygon", "coordinates": [[-100, 40], [-100, 45], [-90, 45], [-90, 40], [-100, 40]]}' - assert get_vegdri_data(polygon, '2022-07-30') == {'test': 'data'} - -@_vegdri_phase5 -def test_invalid_polygon(mock_response): - polygon = '{"type": "Point", "coordinates": [40, -100]}' - assert get_vegdri_data(polygon, '2022-07-30') is None - -@_vegdri_phase5 -def test_polygon_missing_type(mock_response): - polygon = '{"coordinates": [[-100, 40], [-100, 45], [-90, 45], [-90, 40], [-100, 40]]}' - assert get_vegdri_data(polygon, '2022-07-30') is None - -@_vegdri_phase5 -def test_polygon_missing_coordinates(mock_response): - polygon = '{"type": "Polygon"}' - assert get_vegdri_data(polygon, '2022-07-30') is None - -@_vegdri_phase5 -def test_polygon_invalid_coordinates(mock_response): - polygon = '{"type": "Polygon", "coordinates": "abc"}' - assert get_vegdri_data(polygon, '2022-07-30') is None - -def test_polygon_invalid_coordinates_list(mock_response): - polygon = '{"type": "Polygon", "coordinates": [[-100, 40], [-100, 45], [-90, 45], [-90, 40], "abc"]]}' - assert get_vegdri_data(polygon, '2022-07-30') is None - -# integration tests -@_vegdri_phase5 -def test_integration(mock_response): - lat = 37.7749 - lon = -122.4194 - mock_response.get('https://hydro.nationalmap.gov/arcgis/rest/services/wbd/MapServer/4/query', json={ - "features": [ - { - "geometry": { - "rings": [ - [[-122.4194, 37.7749], [-122.4193, 37.7748], [-122.4192, 37.7747]] - ] - } - } - ] - }) - huc8_polygon = get_huc8_polygon(lat, lon) - simplified_polygon = simplify_polygon(huc8_polygon) - geometry = json.dumps({ - "type": "Polygon", - "coordinates": [simplified_polygon] - }) - date = "2022-01-01" - mock_response.get('https://vegdri.cr.usgs.gov/api/v1/data', json={'data': 'some data'}) - data = get_vegdri_data(geometry, date) - assert data == {'data': 'some data'} \ No newline at end of file diff --git a/tests/test_windowing.py b/tests/test_windowing.py index a306377..940f44b 100644 --- a/tests/test_windowing.py +++ b/tests/test_windowing.py @@ -19,6 +19,7 @@ def _make_station_frame(site_id, station_idx, basin_idx, n_days=120, start='2022 'Max Flow': rng.standard_normal(n_days), 'TMIN': rng.standard_normal(n_days), 'TMAX': rng.standard_normal(n_days), + 'precipitation': rng.standard_normal(n_days), 'SWE': rng.standard_normal(n_days), 'soil_moisture': rng.standard_normal(n_days), 'sm_observed': rng.integers(0, 2, n_days), @@ -60,6 +61,11 @@ def test_decoder_features_carry_no_flow_information(): assert 'reservoir_observed' in windowing.ENCODER_FEATURES for c in ('reservoir_storage', 'reservoir_release', 'reservoir_observed'): assert c not in windowing.DECODER_FEATURES + # Precipitation is the ONE auxiliary that IS available at forecast time + # (Open-Meteo / GFS QPF ships a 14-day daily precipitation_sum), so it + # appears in BOTH windows -- observed in the encoder, QPF in the decoder. + assert 'precipitation' in windowing.ENCODER_FEATURES + assert 'precipitation' in windowing.DECODER_FEATURES def test_build_windows_shapes_are_correct():