import os
import shutil
from spatialist.ancillary import finder
from pyroSAR import identify
from pyroSAR.snap.auxil import gpt, parse_recipe, parse_node, \
orb_parametrize
from pyroSAR.ancillary import Lock, LockCollection
from asard import osv
from cesard.tile_extraction import aoi_from_scene
from cesard.ancillary import datamask
from cesard.config import keyval_check
from cesard.snap import mli, rtc, gsr, sgr, geo, postprocess
# do not remove, needed for interface
from cesard.snap import find_datasets, lsm_encoding, version_dict
import logging
log = logging.getLogger('asard')
[docs]
def config_to_string(config):
"""
Convert the values of a configuration dictionary to strings.
Parameters
----------
config: dict
the configuration as returned by :func:`get_config_section`
Returns
-------
dict
the dictionary with the same structure but values converted to strings.
"""
out = {}
allowed = get_config_keys()
for k, v in config.items():
if k not in allowed:
raise ValueError(f'key {k} not in allowed keys: {allowed}')
if v is None:
out[k] = 'None'
elif k in ['clean_edges', 'clean_edges_pixels', 'cleanup']:
out[k] = str(v)
elif k == 'gpt_args' and isinstance(v, list):
out[k] = ' '.join(v)
else:
out[k] = v
return out
[docs]
def get_config_keys():
"""
Get all allowed configuration keys.
Returns
-------
List[str]
"""
return ['clean_edges', 'clean_edges_pixels', 'cleanup',
'dem_resampling_method', 'gpt_args', 'img_resampling_method']
[docs]
def get_config_section(parser, **kwargs):
"""
Get the`config.ini` `SNAP` section content as a dictionary.
Parameters
----------
parser: configparser.ConfigParser
kwargs: dict[str]
Returns
-------
dict
"""
out = {}
defaults = {
'cleanup': 'True',
'clean_edges': 'True',
'clean_edges_pixels': '4',
'dem_resampling_method': 'BILINEAR_INTERPOLATION',
'gpt_args': 'None',
'img_resampling_method': 'BILINEAR_INTERPOLATION',
}
if 'SNAP' in parser.sections():
section = parser['SNAP']
for k, v in defaults.items():
if k not in section and k not in kwargs:
section[k] = v
else:
parser.read_dict({'SNAP': defaults})
section = parser['SNAP']
kwargs_str = config_to_string(kwargs)
for k, v in kwargs_str.items():
if k in get_config_keys():
section[k] = v
for k, v in section.items():
v = keyval_check(key=k, val=v, allowed_keys=get_config_keys())
if k == 'gpt_args':
if v is not None:
v = v.split(' ')
if k == 'clean_edges_pixels':
v = section.getint(k)
if k in ['allow_res_osv', 'clean_edges', 'cleanup']:
v = section.getboolean(k)
out[k] = v
out['dem_prepare_mode'] = 'single-4326'
return out
[docs]
def pre(src, dst, workflow, allow_res_osv=True,
output_beta0=True, output_sigma0=True,
output_gamma0=False, gpt_args=None):
"""
General SAR preprocessing. The following operators are used (optional steps in brackets):
Apply-Orbit-File->Calibration
Parameters
----------
src: str
the file name of the source scene
dst: str
the file name of the target scene. Format is BEAM-DIMAP.
workflow: str
the output SNAP XML workflow filename.
output_beta0:
output beta nought backscatter needed for RTC?
output_sigma0: bool
output sigma nought backscatter needed for NESZ?
output_gamma0: bool
output gamma nought backscatter needed?
gpt_args: list[str] or None
a list of additional arguments to be passed to the gpt call
- e.g. ``['-x', '-c', '2048M']`` for increased tile cache size and intermediate clearing
"""
scene = identify(src)
if not os.path.isfile(workflow):
polarizations = scene.polarizations
wf = parse_recipe('blank')
############################################
read = parse_node('Read')
read.parameters['file'] = scene.scene
wf.insert_node(read)
############################################
if scene.sensor == 'ASAR':
osv_type = 'DORIS Precise VOR (ENVISAT) (Auto Download)'
suffix = 'ASAR'
else:
osv_type = 'DELFT Precise (ENVISAT, ERS1&2) (Auto Download)'
suffix = 'ERS'
osv_user_var = f'ASARD_OSV_USER_{suffix}'
osv_pass_var = osv_user_var.replace('USER', 'PASS')
osv_user = os.environ.get(osv_user_var)
osv_pass = os.environ.get(osv_pass_var)
offline = False
if osv_user is None or osv_pass is None:
offline = True
osv.get(scene=scene, type=osv_type[:5], offline=offline,
username=osv_user, password=osv_pass)
orb = parse_node('Apply-Orbit-File')
orb.parameters['orbitType'] = osv_type
orb.parameters['continueOnFail'] = False
wf.insert_node(orb, before=read.id)
last = orb
############################################
cal = parse_node('Calibration')
wf.insert_node(cal, before=last.id)
if scene.sensor == 'ASAR':
if scene.acquisition_mode in ['APP', 'APS', 'IMS']:
source_bands = [f'Intensity_{x}' for x in polarizations]
elif scene.acquisition_mode in ['IMP', 'WSM']:
source_bands = 'Intensity'
else:
raise ValueError(f'Unsupported acquisition mode: '
f'{scene.acquisition_mode}')
elif scene.sensor in ['ERS1', 'ERS2']:
source_bands = 'Intensity'
else:
raise ValueError(f'Unsupported sensor: {scene.sensor}')
cal.parameters['sourceBands'] = source_bands
# SNAP will fail when this is set
# cal.parameters['selectedPolarisations'] = polarizations
cal.parameters['createBetaBand'] = False
cal.parameters['createGammaBand'] = False
cal.parameters['outputBetaBand'] = output_beta0
cal.parameters['outputSigmaBand'] = output_sigma0
cal.parameters['outputGammaBand'] = output_gamma0
last = cal
############################################
write = parse_node('Write')
wf.insert_node(write, before=last.id)
write.parameters['file'] = dst
write.parameters['formatName'] = 'BEAM-DIMAP'
############################################
wf.write(workflow)
if not os.path.isfile(dst):
gpt(xmlfile=workflow, tmpdir=os.path.dirname(dst),
gpt_args=gpt_args)
[docs]
def process(scene, outdir, measurement, spacing, dem,
dem_resampling_method='BILINEAR_INTERPOLATION',
img_resampling_method='BILINEAR_INTERPOLATION',
rlks=None, azlks=None, tmpdir=None, export_extra=None,
allow_res_osv=True, clean_edges=True, clean_edges_pixels=4,
gpt_args=None, cleanup=True):
"""
Main function for SAR processing with SNAP.
Parameters
----------
scene: str
The SAR scene file name.
outdir: str
The output directory for storing the final results.
measurement: {'sigma', 'gamma'}
the backscatter measurement convention:
- gamma: RTC gamma nought (:math:`\\gamma^0_T`)
- sigma: RTC sigma nought (:math:`\\sigma^0_T`)
spacing: int or float
The output pixel spacing in meters.
dem: str
The DEM filename. Can be created with :func:`cesard.dem.mosaic`.
dem_resampling_method: str
The DEM resampling method.
img_resampling_method: str
The image resampling method.
rlks: int or None
The number of range looks.
azlks: int or None
The number of azimuth looks.
tmpdir: str or None
Path to a temporary directory for intermediate products.
export_extra: list[str] or None
A list of ancillary layers to create. Default None: do not create any ancillary layers.
Options:
- DEM
- gammaSigmaRatio: :math:`\\sigma^0_T / \\gamma^0_T`
- sigmaGammaRatio: :math:`\\gamma^0_T / \\sigma^0_T`
- incidenceAngleFromEllipsoid
- layoverShadowMask
- localIncidenceAngle
- NESZ: noise equivalent sigma zero
- projectedLocalIncidenceAngle
- scatteringArea
- lookDirection: range look direction angle
clean_edges:
Erode noisy image edges? See :func:`pyroSAR.snap.auxil.erode_edges`.
Does not apply to layover-shadow mask.
clean_edges_pixels: int
The number of pixels to erode.
gpt_args: list[str] or None
a list of additional arguments to be passed to the gpt call
- e.g. ``['-x', '-c', '2048M']`` for increased tile cache size and intermediate clearing
cleanup: bool
Delete intermediate files after successful process termination?
Returns
-------
Examples
--------
>>> from asard import snap
>>> scene = 'ASA_IMP_1PNESA20110820_092721_000000173105_00381_49533_0000.N1'
>>> dem = 'ASA_IMP_1PNESA20110820_092721_000000173105_00381_49533_0000_DEM_GLO30.tif'
>>> outdir = '.'
>>> spacing = 10
>>> export_extra = ['localIncidenceAngle', 'incidenceAngleFromEllipsoid',
>>> 'scatteringArea', 'layoverShadowMask', 'gammaSigmaRatio']
>>> snap.process(scene=scene, outdir=outdir, spacing=spacing, dem=dem,
>>> export_extra=export_extra)
"""
if measurement not in ['gamma', 'sigma']:
raise RuntimeError("'measurement' must either be 'gamma' or 'sigma'")
if export_extra is None:
export_extra = []
basename = os.path.splitext(os.path.basename(scene))[0]
outdir_scene = os.path.join(outdir, basename)
if tmpdir is None:
tmpdir = outdir
tmpdir_scene = os.path.join(tmpdir, basename + '_tmp')
else:
tmpdir_scene = os.path.join(tmpdir, basename)
os.makedirs(outdir_scene, exist_ok=True)
os.makedirs(tmpdir_scene, exist_ok=True)
out_base = os.path.join(outdir_scene, basename)
tmp_base = os.path.join(tmpdir_scene, basename)
id = identify(scene)
workflows = []
apply_rtc = True
############################################################################
# general pre-processing
out_pre = tmp_base + '_pre.dim'
out_pre_wf = out_pre.replace('.dim', '.xml')
workflows.append(out_pre_wf)
with Lock(out_pre):
if not os.path.isfile(out_pre):
log.info('preprocessing main scene')
pre(src=scene, dst=out_pre, workflow=out_pre_wf,
output_beta0=apply_rtc, gpt_args=gpt_args)
else:
log.info('main scene has already been preprocessed')
############################################################################
# multi-looking
out_mli = tmp_base + '_mli.dim'
out_mli_wf = out_mli.replace('.dim', '.xml')
with Lock(out_pre, soft=True):
with Lock(out_mli):
if not os.path.isfile(out_mli):
mli(src=out_pre, dst=out_mli, workflow=out_mli_wf,
spacing=spacing, rlks=rlks, azlks=azlks, gpt_args=gpt_args)
if not os.path.isfile(out_mli):
out_mli = out_pre
else:
workflows.append(out_mli_wf)
############################################################################
# radiometric terrain flattening
out_rtc = out_gsr = out_sgr = None
if apply_rtc:
out_rtc = tmp_base + '_rtc.dim'
out_rtc_wf = out_rtc.replace('.dim', '.xml')
workflows.append(out_rtc_wf)
output_sigma0_rtc = measurement == 'sigma' or 'gammaSigmaRatio' in export_extra
with LockCollection([out_mli, dem], soft=True):
with Lock(out_rtc):
if not os.path.isfile(out_rtc):
log.info('radiometric terrain correction')
rtc(src=out_mli, dst=out_rtc, workflow=out_rtc_wf, dem=dem,
dem_resampling_method=dem_resampling_method,
sigma0=output_sigma0_rtc,
scattering_area='scatteringArea' in export_extra,
gpt_args=gpt_args)
########################################################################
# gamma-sigma ratio computation
out_gsr = None
if 'gammaSigmaRatio' in export_extra:
out_gsr = tmp_base + '_gsr.dim'
out_gsr_wf = out_gsr.replace('.dim', '.xml')
workflows.append(out_gsr_wf)
with Lock(out_rtc, soft=True):
with Lock(out_gsr):
if not os.path.isfile(out_gsr):
log.info('computing gamma-sigma ratio')
gsr(src=out_rtc, dst=out_gsr, workflow=out_gsr_wf,
gpt_args=gpt_args)
########################################################################
# sigma-gamma ratio computation
out_sgr = None
if 'sigmaGammaRatio' in export_extra:
out_sgr = tmp_base + '_sgr.dim'
out_sgr_wf = out_sgr.replace('.dim', '.xml')
workflows.append(out_sgr_wf)
with Lock(out_rtc, soft=True):
with Lock(out_sgr):
if not os.path.isfile(out_sgr):
log.info('computing sigma-gamma ratio')
sgr(src=out_rtc, dst=out_sgr, workflow=out_sgr_wf,
gpt_args=gpt_args)
############################################################################
# geocoding
# Process to multiple UTM zones or just one?
# For testing purposes only.
utm_multi = True
def run():
out_geo = out_base + '_geo_{}.dim'.format(epsg)
out_geo_wf = out_geo.replace('.dim', '.xml')
sources = list(filter(None, [out_mli, out_rtc, out_gsr, out_sgr]))
with LockCollection(sources, soft=True):
with Lock(out_geo):
if not os.path.isfile(out_geo):
log.info(f'geocoding to EPSG:{epsg}')
scene1 = identify(out_mli)
pols = scene1.polarizations
bands0 = ['NESZ_{}'.format(pol) for pol in pols]
if measurement == 'gamma':
bands1 = ['Gamma0_{}'.format(pol) for pol in pols]
else:
bands0.extend(['Sigma0_{}'.format(pol) for pol in pols])
bands1 = []
if 'scatteringArea' in export_extra:
bands1.append('simulatedImage')
if 'lookDirection' in export_extra:
bands0.append('lookDirection')
geo(*sources,
dst=out_geo, workflow=out_geo_wf,
spacing=spacing, crs=epsg, geometry=ext,
export_extra=export_extra,
standard_grid_origin_x=align_x,
standard_grid_origin_y=align_y,
bands0=bands0, bands1=bands1, dem=dem,
dem_resampling_method=dem_resampling_method,
img_resampling_method=img_resampling_method,
gpt_args=gpt_args)
log.info('edge cleaning')
postprocess(out_geo, clean_edges=clean_edges,
clean_edges_pixels=clean_edges_pixels)
log.info('creating valid data masks')
out_geo_data = out_geo.replace('.dim', '.data')
pattern = r'(?:Gamma0|Sigma0)_[VH]{2}\.img$'
measurements = finder(out_geo_data, [pattern],
regex=True, recursive=False)
dm_ras = os.path.join(out_geo_data, 'datamask.tif')
dm_vec = dm_ras.replace('.tif', '.gpkg')
dm_vec = datamask(measurement=measurements[0],
dm_ras=dm_ras, dm_vec=dm_vec)
else:
log.info(f'geocoding to EPSG:{epsg} has already been performed')
for wf in workflows:
wf_dst = os.path.join(outdir_scene, os.path.basename(wf))
if wf != wf_dst and not os.path.isfile(wf_dst):
shutil.copyfile(src=wf, dst=wf_dst)
log.info('determining UTM zone overlaps')
aois = aoi_from_scene(scene=id, multi=utm_multi)
for aoi in aois:
ext = aoi['extent']
epsg = aoi['epsg']
align_x = aoi['extent_utm']['xmin']
align_y = aoi['extent_utm']['ymax']
run()
############################################################################
# delete intermediate files
if cleanup:
log.info('cleaning up')
shutil.rmtree(tmpdir_scene)