import logging
import os
from pywps import FORMATS, ComplexInput, ComplexOutput, Format, LiteralInput, LiteralOutput, Process
from pywps.app.Common import Metadata
from pywps.response.status import WPS_STATUS
from pywps.inout.literaltypes import AllowedValue
from pywps.validator.allowed_value import ALLOWEDVALUETYPE
from .. import runner, util
from .utils import (default_outputs, model_experiment_ensemble, outputs_from_data_names,
outputs_from_plot_names, year_ranges, region, check_constraints)
LOGGER = logging.getLogger("PYWPS")
[docs]class RainFARM(Process):
def __init__(self):
self.variables = ['pr']
self.frequency = 'day'
inputs = [
*model_experiment_ensemble(model='ACCESS1-0',
experiment='historical',
ensemble='r1i1p1',
max_occurs=1,
required_variables=self.variables,
required_frequency=self.frequency),
*year_ranges((1997, 1999)),
*region(5, 15, 40, 50),
LiteralInput(
'target_grid',
'Target Grid',
abstract=('Target grid in degrees (e.g. 1x1) can also be the name of one '
'of the datasets to use the grid from that dataset.'),
data_type='string',
default='1x1',
),
LiteralInput(
'scheme',
'Scheme',
abstract='Regridding scheme to be used.',
data_type='string',
allowed_values=['linear', 'nearest', 'area_weighted', 'unstructured_nearest'],
default='area_weighted',
),
LiteralInput(
'slope',
'Slope',
abstract=('Spatial spectral slope (set to 0 to compute from'
'large scales).'),
data_type='float',
default=0.,
),
LiteralInput(
'nens',
'No. of ensemble members',
abstract='Number of ensemble members to be calculated.',
data_type='integer',
default=2,
allowed_values=AllowedValue(allowed_type=ALLOWEDVALUETYPE.RANGE, minval=1, maxval=1000),
),
LiteralInput(
'nf',
'No. of subdivisions',
abstract=('Number of subdivisions for downscaling (e.g. 8 will '
'produce output fields with linear resolution '
'increased by a factor 8).'),
data_type='integer',
default=8,
allowed_values=AllowedValue(allowed_type=ALLOWEDVALUETYPE.RANGE, minval=1, maxval=1000),
),
LiteralInput(
'conserv_glob',
'Conserve Global',
abstract='Conserve precipitation over full domain?',
data_type='string',
allowed_values=['true', 'false'],
default='false',
),
LiteralInput(
'conserv_smooth',
'Conserve Smooth',
abstract=('Conserve precipitation using convolution (if '
'neither is chosen box conservation is used)?'),
data_type='string',
allowed_values=['true', 'false'],
default='true',
),
]
self.datalist = [
('downscaled_data_ensemble_member_1', [FORMATS.NETCDF]),
('downscaled_data_ensemble_member_2', [FORMATS.NETCDF]),
]
outputs = [
*outputs_from_data_names(self.datalist),
ComplexOutput('archive',
'Archive',
abstract='The complete output of the ESMValTool processing as an zip archive.',
as_reference=True,
supported_formats=[Format('application/zip')]),
*default_outputs(),
]
super(RainFARM, self).__init__(
self._handler,
identifier="rainfarm",
title="RainFARM stochastic downscaling",
version=runner.VERSION,
abstract="""Precipitation extremes and small-scale variability are essential drivers in many climate change
impact studies. However, the spatial resolution currently achieved by global and regional
climate models is still insufficient to correctly identify the fine structure of precipitation
intensity fields. In the absence of a proper physically based representation, this scale gap
can be at least temporarily bridged by adopting a stochastic rainfall downscaling technique
(Rebora et al, 2006). With this aim, the Rainfall Filtered Autoregressive Model (RainFARM)was
developed to apply the stochastic precipitation downscaling method to climate models. The
selected region needs to have equal and even number of longitude (in any case it is cut) and
latitude grid points (e.g., 2x2, 4x4). Warning: downscaling can reach very high resolution,
so select a limited area. Note that only the first two ensemble members are returned as data
fields. If applicable, others are available in the output archive.
The estimated calculation time of this process is 3 minutes for the default values supplied.
""",
metadata=[
Metadata('ESMValTool', 'http://www.esmvaltool.org/'),
Metadata('Documentation',
'https://esmvaltool.readthedocs.io/en/v2.0a2/recipes/recipe_rainfarm.html',
role=util.WPS_ROLE_DOC),
],
inputs=inputs,
outputs=outputs,
status_supported=True,
store_supported=True)
def _handler(self, request, response):
response.update_status("starting ...", 0)
# build esgf search constraints
constraints = dict(
model=request.inputs['model'][0].data,
experiment=request.inputs['experiment'][0].data,
ensemble=request.inputs['ensemble'][0].data,
)
options = dict(
start_longitude=request.inputs['start_longitude'][0].data,
end_longitude=request.inputs['end_longitude'][0].data,
start_latitude=request.inputs['start_latitude'][0].data,
end_latitude=request.inputs['end_latitude'][0].data,
target_grid=request.inputs['target_grid'][0].data,
scheme=request.inputs['scheme'][0].data,
slope=request.inputs['slope'][0].data,
nens=request.inputs['nens'][0].data,
nf=request.inputs['nf'][0].data,
conserv_glob=request.inputs['conserv_glob'][0].data,
conserv_smooth=request.inputs['conserv_smooth'][0].data,
weights_climo='false',
)
# generate recipe
response.update_status("generate recipe ...", 10)
recipe_file, config_file = runner.generate_recipe(
workdir=self.workdir,
diag='rainfarm',
constraints=constraints,
options=options,
start_year=request.inputs['start_year'][0].data,
end_year=request.inputs['end_year'][0].data,
output_format='png',
)
# recipe output
response.outputs['recipe'].output_format = FORMATS.TEXT
response.outputs['recipe'].file = recipe_file
# run diag
response.update_status("running diagnostic (this could take a while)...", 20)
# Disable HDF5 library version mismatched error for rainfarm metric
os.environ["HDF5_DISABLE_VERSION_CHECK"] = "1"
result = runner.run(recipe_file, config_file)
del os.environ["HDF5_DISABLE_VERSION_CHECK"]
response.outputs['success'].data = result['success']
# log output
response.outputs['log'].output_format = FORMATS.TEXT
response.outputs['log'].file = result['logfile']
# debug log output
response.outputs['debug_log'].output_format = FORMATS.TEXT
response.outputs['debug_log'].file = result['debug_logfile']
if result['success']:
try:
self.get_outputs(result, constraints, options, response)
except Exception as e:
response.update_status("exception occured: " + str(e), 85)
else:
LOGGER.exception('esmvaltool failed!')
response.update_status("exception occured: " + result['exception'],
85)
response.update_status("creating archive of diagnostic result ...", 90)
response.outputs['archive'].output_format = Format('application/zip')
response.outputs['archive'].file = runner.compress_output(
os.path.join(self.workdir, 'output'),
os.path.join(self.workdir, 'rainfarm_result.zip'))
response.update_status("done.", 100)
return response
[docs] def get_outputs(self, result, constraints, options, response):
# result plot
response.update_status("collecting output ...", 80)
number_of_outputs = min(options['nens'], 2)
# get first two ensemble members
for ensemble_member in range(1, number_of_outputs + 1):
datakey = 'downscaled_data_ensemble_member_{}_data'.format(ensemble_member)
LOGGER.info('Setting response for: {}'.format(datakey))
response.outputs[datakey].output_format = FORMATS.NETCDF
response.outputs[datakey].file = runner.get_output(result['work_dir'],
path_filter=os.path.join('rainfarm', 'rainfarm'),
name_filter="*{:03d}".format(ensemble_member),
output_format="nc")