import os
import time
import inspect
from importlib import import_module
from osgeo import gdal
from spatialist import bbox, intersect
from spatialist.ancillary import finder
from pyroSAR import identify, identify_many, Archive
from asard.config import get_config, gdal_conf
import asard.ancillary as ancil
from asard.ard import product_info, append_metadata, get_datasets, format
gdal.UseExceptions()
from cesard import dem
from cesard.search import scene_select
from cesard.ancillary import get_max_ext, group_by_attr
import cesard.tile_extraction as tile_ex
[docs]
def main(config_file, **kwargs):
"""
Main processing function.
Parameters
----------
config_file: str or None
Path to the INI configuration file.
kwargs:
Additional arguments overriding configuration in `config_file`.
Returns
-------
"""
update = False # update existing products? Internal development flag.
config = get_config(config_file=config_file, **kwargs)
log = ancil.set_logging(config=config)
config_proc = config['processing']
processor_name = config_proc['processor']
processor = import_module(f'asard.{processor_name}')
config_sar = config[processor_name]
gdal_prms = gdal_conf(config=config)
spacings = {'AP': 10, 'IM': 10, 'WS': 60}
config_sar['spacing'] = spacings[config_proc['acq_mode'][:2]]
sar_flag = 'sar' in config_proc['mode']
nrb_flag = 'nrb' in config_proc['mode']
username, password = dem.authenticate(dem_type=config_proc['dem_type'],
username=None, password=None)
####################################################################################################################
# scene selection
log.info('collecting scenes')
db_file_set = config_proc['db_file'] is not None
scene_dir_set = config_proc['scene_dir'] is not None
if db_file_set:
archive = Archive(dbfile=config_proc['db_file'])
if scene_dir_set:
scenes = finder(target=config_proc['scene_dir'],
matchlist=[r'^SAR.*\.E[12]', r'^ASA.*\.N1'],
regex=True, recursive=True, foldermode=1)
archive.insert(scenes)
else:
raise RuntimeError('could not select a search option. Please check your configuration.')
if config_proc['scene'] is None:
attr_search = ['sensor', 'mindate', 'maxdate',
'aoi_tiles', 'aoi_geometry', 'date_strict']
dict_search = {k: config_proc[k] for k in attr_search}
dict_search['acquisition_mode'] = config_proc['acq_mode']
product = 'PRI'
if config_proc['acq_mode'].endswith('S'):
product = 'SLC'
if config_proc['acq_mode'].endswith('M'):
product = 'MR'
dict_search['product'] = product
selection, aoi_tiles = scene_select(archive=archive, **dict_search)
archive.close()
if len(selection) == 0:
log.error('could not find any scenes')
return
log.info(f'found {len(selection)} scene(s)')
scenes = identify_many(selection, sortkey='start')
else:
if config_proc['mode'] != ['sar']:
raise RuntimeError("if argument 'scene' is set, the processing mode must be 'sar'")
scenes = [identify(config_proc['scene'])]
config_proc['acq_mode'] = scenes[0].acquisition_mode
config_proc['product'] = scenes[0].product
aoi_tiles = []
# group scenes by absolute orbit number
scenes_grouped = group_by_attr(scenes, lambda x: x.meta['orbitNumber_abs'])
####################################################################################################################
# annotation layer selection
annotation = config_proc['annotation']
measurement = 'gamma'
export_extra = None
lookup = {'dm': 'layoverShadowMask',
'ei': 'incidenceAngleFromEllipsoid',
'lc': 'scatteringArea',
'ld': 'lookDirection',
'li': 'localIncidenceAngle',
'np': 'NESZ',
'gs': 'gammaSigmaRatio',
'sg': 'sigmaGammaRatio'}
if annotation is not None:
annotation = [
'gs' if x == 'ratio' and measurement == 'gamma'
else 'sg' if x == 'ratio'
else x for x in annotation]
export_extra = []
for layer in annotation:
if layer in lookup:
export_extra.append(lookup[layer])
####################################################################################################################
# main SAR processing
if sar_flag:
for h, scenes in enumerate(scenes_grouped):
log.info(f'SAR processing of group {h + 1}/{len(scenes_grouped)}')
for i, scene in enumerate(scenes):
scene_base = os.path.splitext(os.path.basename(scene.scene))[0]
out_dir_scene = os.path.join(config_proc['sar_dir'], scene_base)
tmp_dir_scene = os.path.join(config_proc['tmp_dir'], scene_base)
log.info(f'processing scene {i + 1}/{len(scenes)}: {scene.scene}')
if os.path.isdir(out_dir_scene) and not update:
log.info('Already processed - Skip!')
continue
else:
os.makedirs(out_dir_scene, exist_ok=True)
os.makedirs(tmp_dir_scene, exist_ok=True)
########################################################################################################
# Preparation of DEM for SAR processing
dem_prepare_mode = config_sar['dem_prepare_mode']
if dem_prepare_mode is not None:
fname_dem = dem.prepare(scene=scene, dem_type=config_proc['dem_type'],
dir_out=tmp_dir_scene, username=username,
password=password, mode=dem_prepare_mode,
tr=(config_sar['spacing'], config_sar['spacing']))
else:
fname_dem = None
########################################################################################################
# main processing routine
start_time = time.time()
try:
log.info('starting SAR processing')
proc_args = {'scene': scene.scene,
'outdir': config_proc['sar_dir'],
'measurement': measurement,
'tmpdir': config_proc['tmp_dir'],
'dem': fname_dem,
'export_extra': export_extra}
proc_args.update(config_sar)
sig = inspect.signature(processor.process)
accepted_params = set(sig.parameters.keys())
proc_args = {k: v for k, v in proc_args.items() if k in accepted_params}
processor.process(**proc_args)
t = round((time.time() - start_time), 2)
log.info(f'SAR processing finished in {t} seconds')
except Exception as e:
log.error(msg=e)
raise
####################################################################################################################
# NRB - final product generation
if nrb_flag:
product_type = 'NRB'
log.info(f'starting {product_type} production')
for s, scenes in enumerate(scenes_grouped):
log.info(f'ARD processing of group {s + 1}/{len(scenes_grouped)}')
log.info('preparing WBM tiles')
vec = [x.geometry() for x in scenes]
extent = get_max_ext(geometries=vec)
with bbox(coordinates=extent, crs=4326) as box:
dem.retile(vector=box, threads=gdal_prms['threads'],
dem_dir=None, wbm_dir=config_proc['wbm_dir'],
dem_type=config_proc['dem_type'],
tilenames=aoi_tiles, username=username, password=password,
dem_strict=True)
# get the geometries of all tiles that overlap with the current scene group
tiles = tile_ex.tile_from_aoi(vector=vec,
return_geometries=True,
tilenames=aoi_tiles)
del vec
t_total = len(tiles)
for t, tile in enumerate(tiles):
# select all scenes from the group whose footprint overlaps with the current tile
scenes_sub = [x for x in scenes if intersect(tile, x.geometry())]
scenes_sub_fnames = [x.scene for x in scenes_sub]
outdir = os.path.join(config_proc['ard_dir'], tile.mgrs)
os.makedirs(outdir, exist_ok=True)
fname_wbm = os.path.join(config_proc['wbm_dir'],
config_proc['dem_type'],
'{}_WBM.tif'.format(tile.mgrs))
if not os.path.isfile(fname_wbm):
fname_wbm = None
add_dem = True # add the DEM as output layer?
dem_type = config_proc['dem_type'] if add_dem else None
extent = tile.extent
epsg = tile.getProjection('epsg')
log.info(f'creating product {t + 1}/{t_total}')
log.info(f'selected scene(s): {scenes_sub_fnames}')
try:
prod_meta = product_info(product_type=product_type, src_ids=scenes_sub,
tile_id=tile.mgrs, extent=extent, epsg=epsg,
dir_out=outdir, update=update)
except RuntimeError:
log.info('Already processed - Skip!')
continue
log.info(f'product name: {os.path.join(outdir, prod_meta["product_base"])}')
try:
src_ids, sar_assets = get_datasets(scenes=scenes_sub,
sar_dir=config_proc['sar_dir'],
extent=extent, epsg=epsg,
processor_name=config_proc['processor'])
ard_assets = format(config=config, prod_meta=prod_meta,
src_ids=src_ids, sar_assets=sar_assets,
tile=tile.mgrs, extent=extent, epsg=epsg,
wbm=fname_wbm, dem_type=dem_type, compress='LERC_ZSTD',
multithread=gdal_prms['multithread'], annotation=annotation)
if ard_assets is not None:
append_metadata(config=config, prod_meta=prod_meta,
src_ids=src_ids, assets=ard_assets, compression='LERC_ZSTD')
except Exception as e:
log.error(msg=e)
raise
del tiles
gdal.SetConfigOption('GDAL_NUM_THREADS', gdal_prms['threads_before'])