Source code for asard.osv

import os
import re
import numpy as np
import pandas as pd
from urllib.parse import urlparse
from ftplib import FTP, FTP_TLS
from pathlib import Path
from zipfile import ZipFile, ZIP_DEFLATED
import tarfile as tf
from datetime import datetime, timezone
from dateutil.parser import parse as dateparse
from dateutil.relativedelta import relativedelta
from pyroSAR.drivers import ID
from pyroSAR.examine import ExamineSnap
from spatialist.ancillary import finder
from typing import Literal
import logging

log = logging.getLogger('asard')

_osv_asar = (r'DOR_VOR_AXVF-P'
             r'(?P<publish>[0-9]{8}_[0-9]{6})_'
             r'(?P<start>[0-9]{8}_[0-9]{6})_'
             r'(?P<stop>[0-9]{8}_[0-9]{6})'
             r'(?:\.zip|)')


def _asar_osv_meta(
        file: str
) -> dict[str, str | datetime]:
    """
    Read metadata from the ASAR OSV file name.
    Timestamps are converted to timezone-aware datetime objects.

    Parameters
    ----------
    file:
        the DORIS OSV file name.

    Returns
    -------
        the metadata dictionary
    """
    meta = re.match(_osv_asar, os.path.basename(file)).groupdict()
    for date in ['publish', 'start', 'stop']:
        dt = datetime.strptime(meta[date], '%Y%m%d_%H%M%S')
        meta[date] = dt.replace(tzinfo=timezone.utc)
    return meta


def _download_ers_reaper(
        username: str,
        password: str,
        target: str,
        satellite: str | list[str],
        timeout: int = 60
):
    url = ('ftps://ersorb-ftp-ds.eo.esa.int/'
           'ERS_Orbit_files/POD_REAPER_v2_FullMission')
    parsed = urlparse(url)
    ftp = FTP_TLS()
    ftp.connect(host=parsed.netloc, timeout=timeout, port=21)
    ftp.auth()
    ftp.login(username, password)
    ftp.prot_p()
    ftp.cwd(parsed.path)
    if isinstance(satellite, str):
        satellite = [satellite]
    for sat in satellite:
        base = f'deos{sat.lower()}tar.gz'
        remote = f'{parsed.path}/{sat}/{base}'
        local = (Path(target) / 'Orbits' / 'REAPER' /
                 'POD_REAPER_v2_FullMission' / 'DEOS' / sat / base)
        local.parent.mkdir(parents=True, exist_ok=True)
        if not local.exists():
            log.info(f'{local} <<-- {remote}')
            with open(local, 'wb') as f:
                ftp.retrbinary(f"RETR {remote}", f.write)
            archive = tf.open(local, 'r')
            archive.extractall(local.parent)
            archive.close()
    ftp.quit()


def _download_ers_delft(
        satellite: str | list[str],
        target: str | None = None,
        start: datetime | None = None,
        stop: datetime | None = None,
        timeout: int = 60
) -> None:
    if isinstance(satellite, str):
        satellite = [satellite]
    satellites = [{'ERS1': 'ERS-1', 'ERS2': 'ERS-2'}[x] for x in satellite]
    url = 'ftp://dutlru2.lr.tudelft.nl/pub/orbits'
    sub = 'dgm-e04'
    parsed = urlparse(url)
    ftp = FTP(parsed.netloc, timeout=timeout)
    ftp.login()
    for sat in satellites:
        remote = f'{parsed.path}/ODR.{sat}/{sub}/arclist'
        local_path = (Path(target) / 'Orbits' / 'Delft Precise Orbits' /
                      f'ODR.{sat}' / sub)
        local_path.mkdir(parents=True, exist_ok=True)
        local = local_path / 'arclist'
        if not local.exists():
            log.info(f'{local} <<-- {url}/{remote}')
            with open(local, 'wb') as f:
                ftp.retrbinary(f"RETR {remote}", f.write)
        arclist = _read_arclist(local)
        if start is not None:
            arclist = arclist[arclist['stop'] >= start]
        if stop is not None:
            arclist = arclist[arclist['start'] <= stop]
        arcs = arclist['arc'].to_list()
        for arc in arcs:
            basename = f'ODR.{arc}'
            remote = f'{parsed.path}/ODR.{sat}/{sub}/{basename}'
            local = local_path / basename
            if not local.exists():
                log.info(f'{local} <<-- {url}/{remote}')
                with open(local, 'wb') as f:
                    ftp.retrbinary(f"RETR {remote}", f.write)
    ftp.quit()


def _read_arclist(arclist: str | Path) -> pd.DataFrame:
    pattern = ('(?P<arc>[0-9]{3})  '
               '(?P<start>[0-9 :]{12}) - '
               '(?P<stop>[0-9 :]{12}) '
               '(?P<slr>[0-9. ]{5}) '
               '(?P<xover>[0-9. ]{4}) '
               '(?P<altim>[0-9. ]{5}) '
               '(?P<repeat>[0-9. ]{7}) '
               '(?P<ver>[0-9 ]{4}) '
               '(?P<begin>[0-9 :]{15})'
               )
    with open(arclist, 'r') as f:
        for line in f:
            if line.startswith('Arc#'):
                break
        entries = []
        for line in f:
            entry = re.search(pattern, line).groupdict()
            entry['ver'] = int(entry['ver'])
            for key in ['slr', 'xover', 'altim', 'repeat']:
                if set(entry[key]).pop() == ' ':
                    entry[key] = None
                else:
                    entry[key] = float(entry[key])
            decade = '19' if entry['start'].startswith('9') else '20'
            for key in ['start', 'stop', 'begin']:
                date = dateparse(decade + entry[key])
                date = date.replace(tzinfo=timezone.utc)
                entry[key] = date
            entries.append(entry)
    return pd.DataFrame(entries)


[docs] def download_asar( username: str, password: str, target: str | None = None, start: datetime | None = None, stop: datetime | None = None ) -> None: """ Download DORIS OSV files from ftps://envorb-ftp-ds.eo.esa.int. Parameters ---------- username: the FTP access username password: the FTP access password target: the download location. Default None: Store data in a subdirectory of the configured SNAP auxiliary data path (See :meth:`pyroSAR.examine.ExamineSnap.auxdatapath`). start: the acquisition start. Default None: no acquisition start limit stop: the acquisition stop. Default None: no acquisition stop limit Returns ------- """ if target is None: target = ExamineSnap().auxdatapath url = 'ftps://envorb-ftp-ds.eo.esa.int/Envisat_DORIS/DOR_VOR_AX_f' parsed = urlparse(url) ftp = FTP_TLS() ftp.connect(host=parsed.netloc, timeout=60, port=21) ftp.auth() ftp.login(username, password) ftp.prot_p() ftp.cwd(parsed.path) folders = ftp.nlst() if start is not None: if start.tzinfo is None: start = start.replace(tzinfo=timezone.utc) folder_min = start.strftime('%Y%m') else: folder_min = min(folders) if stop is not None: if stop.tzinfo is None: stop = stop.replace(tzinfo=timezone.utc) folder_max = stop.strftime('%Y%m') else: folder_max = max(folders) for folder in folders: if folder_min > folder or folder > folder_max: continue folder_path = parsed.path + '/' + folder ftp.cwd(folder_path) files = ftp.nlst() for file in files: meta = _asar_osv_meta(file) if start is not None: if meta['stop'] < start: continue if stop is not None: if meta['start'] > stop: continue remote = f'{parsed.path}/{folder}/{file}' local = Path(target) / 'Orbits' / 'Doris' / 'vor' / folder / (file + '.zip') local.parent.mkdir(parents=True, exist_ok=True) if not local.exists(): log.info(f'{local} <<-- {remote}') with ZipFile(local, mode="w", compression=ZIP_DEFLATED) as zf: with zf.open(Path(remote).name, mode="w") as zf_file: ftp.retrbinary(f"RETR {remote}", zf_file.write) ftp.quit()
[docs] def download_ers( type: Literal["DELFT", "REAPER"], username: str | None = None, password: str | None = None, satellite: Literal["ERS1", "ERS2"] | None = None, target: str | None = None, start: datetime | None = None, stop: datetime | None = None, ) -> None: """ Download ERS-1/2 OSV files. Parameters ---------- username: the FTP access username. Only needed for REAPER products. password: the FTP access password. Only needed for REAPER products. type: the OSV type to download; supported options: - DELFT (ftp://dutlru2.lr.tudelft.nl/pub/orbits/ODR.ERS-{1|2}/dgm-e04/arclist) - REAPER (ftps://ersorb-ftp-ds.eo.esa.int/ERS_Orbit_files/POD_REAPER_v2_FullMission) satellite: Either ERS1, ERS2 or None (both satellites). target: the download location. Default None: Store data in a subdirectory of the configured SNAP auxiliary data path (See :meth:`pyroSAR.examine.ExamineSnap.auxdatapath`). start: the acquisition start. Default None: no limit stop: the acquisition stop. Default None: no limit """ satellite_options = ['ERS1', 'ERS2'] if satellite is None: satellites = satellite_options else: if satellite not in satellite_options: raise ValueError(f'satellite {satellite} is not supported') else: satellites = [satellite] if target is None: target = ExamineSnap().auxdatapath if type == 'DELFT': _download_ers_delft(target=target, satellite=satellites, start=start, stop=stop) elif type == 'REAPER': _download_ers_reaper(username=username, password=password, target=target, satellite=satellites) else: raise ValueError(f"unknown OSV type: '{type}'")
[docs] def get( scene: ID, type: Literal["DELFT", "DORIS", "REAPER"], offline: bool = False, username: str | None = None, password: str | None = None, target: str | None = None ) -> str | None: """ Get the matching OSV file for a satellite product. Parameters ---------- scene: the satellite product object type: the OSV type to download; supported options: - ASAR: DORIS - ERS: DELFT, REAPER offline: only search offline? username: the FTP access username password: the FTP access password target: the download location. Default None: Store data in a subdirectory of the configured SNAP auxiliary data path (See :meth:`pyroSAR.examine.ExamineSnap.auxdatapath`). Returns ------- either the name of an existing file or None if no file was found """ start = scene.start_dt - relativedelta(minute=8) stop = scene.stop_dt + relativedelta(minute=8) if target is None: target = ExamineSnap().auxdatapath if scene.sensor == 'ASAR': if type not in ['DORIS']: raise ValueError(f"unsupported OSV type '{type}' for sensor '{scene.sensor}'") if not offline: download_asar(username=username, password=password, target=target, start=start, stop=stop) location = Path(target) / 'Orbits' / 'Doris' / 'vor' month_previous = (start - relativedelta(months=1)).strftime('%Y%m') month_current = start.strftime('%Y%m') folders = [location / month_previous, location / month_current] folders = [x for x in folders if x.exists()] candidates = [] for folder in folders: files = finder(target=str(folder), matchlist=[_osv_asar], recursive=False, regex=True) for file in files: meta = _asar_osv_meta(file) if meta['start'] < start < meta['stop']: candidates.append((file, meta)) if len(candidates) == 0: return None elif len(candidates) == 1: return candidates[0][0] else: diff_start = [start - x[1]['start'] for x in candidates] diff_stop = [x[1]['stop'] - stop for x in candidates] # determine the OSV file(s), whose range best covers that of the scene # a balanced score is computed optimizing the time range covered by the OSV # file before and after the scene acquisition a_num = np.array([x.total_seconds() for x in diff_start]) b_num = np.array([x.total_seconds() for x in diff_stop]) a_norm = (a_num - a_num.min() + 1) / (a_num.max() - a_num.min() + 1) b_norm = (b_num - b_num.min() + 1) / (b_num.max() - b_num.min() + 1) score = a_norm + b_norm indices = np.flatnonzero(score == score.max()) if len(indices) == 1: return candidates[indices[0]][0] else: # if multiple files have the same score, return the last published file publish = [candidates[x][1]['publish'] for x in indices] idx = publish.index(max(publish)) return candidates[idx][0] elif scene.sensor in ['ERS1', 'ERS2']: if type not in ['DELFT', 'REAPER']: raise ValueError(f"unsupported OSV type '{type}' for sensor '{scene.sensor}'") if not offline: download_ers(username=username, password=password, target=target, satellite=scene.sensor, type=type, start=scene.start, stop=scene.stop) if type == 'DELFT': sat = {'ERS1': 'ERS-1', 'ERS2': 'ERS-2'}[scene.sensor] sub = 'dgm-e04' location = (Path(target) / 'Orbits' / 'Delft Precise Orbits' / f'ODR.{sat}' / sub) arclist = _read_arclist(location / 'arclist') candidates = arclist[(arclist['stop'] >= start) & (arclist['start'] <= stop) & (arclist['begin'] <= start)] if len(candidates) == 0: return None else: osv = location / f'ODR.{candidates.iloc[0]['arc']}' if not osv.exists(): return None return str(osv) elif type == 'REAPER': location = (Path(target) / 'Orbits' / 'REAPER' / 'POD_REAPER_v2_FullMission' / 'DEOS' / scene.sensor) prefix = 'xxo' if scene.sensor == 'ERS1' else 'adr' base = f'{prefix}.{scene.start[2:8]}.sp3' osv = location / base if not osv.exists(): return None return str(osv) else: raise ValueError(f'unknown sensor: {scene.sensor}')