import logging
import os
from pywps import FORMATS, ComplexInput, ComplexOutput, Format, LiteralInput, LiteralOutput, Process
from pywps.app.Common import Metadata
from pywps.response.status import WPS_STATUS
from pywps.inout.literaltypes import AllowedValue
from pywps.validator.allowed_value import ALLOWEDVALUETYPE
from .. import runner, util
from .utils import default_outputs, model_experiment_ensemble, year_ranges, check_constraints
LOGGER = logging.getLogger("PYWPS")
[docs]class EnsClus(Process):
def __init__(self):
self.variables = ['pr', 'tas']
self.frequency = 'mon'
inputs = [
*model_experiment_ensemble(model='ACCESS1-0',
experiment='historical',
ensemble='r1i1p1',
min_occurs=3,
required_variables=self.variables,
required_frequency=self.frequency),
*year_ranges((1900, 2005)),
LiteralInput('variable',
'Variable',
abstract='Select the variable to simulate.',
data_type='string',
default='pr',
allowed_values=['pr', 'tas']),
LiteralInput(
'season',
'Season',
abstract='Choose a season like DJF.',
data_type='string',
allowed_values=['DJF', 'DJFM', 'NDJFM', 'JJA'],
default='JJA',
),
LiteralInput(
'area',
'Area',
abstract='Area over which to calculate.',
data_type='string',
allowed_values=['EU', 'EAT', 'PNA', 'NH'],
default='EU',
),
LiteralInput(
'extreme',
'Extreme',
abstract='Extreme metric.',
data_type='string',
allowed_values=[
'60th_percentile', '75th_percentile', '90th_percentile', 'mean', 'maximum', 'std', 'trend'
],
default='75th_percentile',
),
LiteralInput(
'numclus',
'Number of Clusters',
abstract='Number of clusters.',
data_type='integer',
default=2,
allowed_values=AllowedValue(allowed_type=ALLOWEDVALUETYPE.RANGE, minval=1, maxval=1000),
),
LiteralInput(
'perc',
'Percentage',
abstract='Percentage of total Variance',
data_type='integer',
default='80',
allowed_values=AllowedValue(allowed_type=ALLOWEDVALUETYPE.RANGE, minval=0, maxval=100),
),
LiteralInput(
'numpcs',
'Number of PCs',
abstract='Number of PCs to retain. Has priority over Percentage unless set to 0',
data_type='integer',
default='0',
allowed_values=AllowedValue(allowed_type=ALLOWEDVALUETYPE.RANGE, minval=0, maxval=1000),
),
]
outputs = [
ComplexOutput('plot',
'Output plot',
abstract='Generated output plot of ESMValTool processing.',
as_reference=True,
supported_formats=[Format('image/eps')]),
ComplexOutput('ens_extreme',
'ens_extreme',
abstract='Generated output data of ESMValTool processing.',
as_reference=True,
supported_formats=[FORMATS.NETCDF]),
ComplexOutput('ens_climatologies',
'ens_climatologies',
abstract='Generated output data of ESMValTool processing.',
as_reference=True,
supported_formats=[FORMATS.NETCDF]),
ComplexOutput('ens_anomalies',
'ens_anomalies',
abstract='Generated output data of ESMValTool processing.',
as_reference=True,
supported_formats=[FORMATS.NETCDF]),
ComplexOutput('statistics',
'Statistics',
abstract='Clustering Statistics',
as_reference=True,
supported_formats=[Format('text/plain')]),
ComplexOutput('archive',
'Archive',
abstract='The complete output of the ESMValTool processing as an zip archive.',
as_reference=True,
supported_formats=[Format('application/zip')]),
*default_outputs(),
]
super(EnsClus, self).__init__(
self._handler,
identifier="ensclus",
title="EnsClus - Ensemble Clustering",
version=runner.VERSION,
abstract="""Cluster analysis tool based on the k-means algorithm
for ensembles of climate model simulations. EnsClus group
ensemble members according to similar characteristics and
select the most representative member for each cluster. The estimated calculation
time of this process is 4 minutes for the default values supplied.
The Ensemble Clustering metric requires at least two models to be chosen,
choosing more models is supported.
""",
metadata=[
Metadata('ESMValTool', 'http://www.esmvaltool.org/'),
Metadata('Documentation',
'https://esmvaltool.readthedocs.io/en/v2.0a2/recipes/recipe_ensclus.html',
role=util.WPS_ROLE_DOC),
],
inputs=inputs,
outputs=outputs,
status_supported=True,
store_supported=True)
def _handler(self, request, response):
response.update_status("starting ...", 0)
# build esgf search constraints
constraints = dict(
models=request.inputs['model'],
ensembles=request.inputs['ensemble'],
experiments=request.inputs['experiment'],
)
check_constraints(constraints)
options = dict(
season=request.inputs['season'][0].data,
area=request.inputs['area'][0].data,
extreme=request.inputs['extreme'][0].data,
numclus=request.inputs['numclus'][0].data,
perc=request.inputs['perc'][0].data,
numpcs=request.inputs['numpcs'][0].data,
variable=request.inputs['variable'][0].data,
)
# generate recipe
response.update_status("generate recipe ...", 10)
recipe_file, config_file = runner.generate_recipe(
workdir=self.workdir,
diag='ensclus',
constraints=constraints,
start_year=request.inputs['start_year'][0].data,
end_year=request.inputs['end_year'][0].data,
output_format='png',
options=options,
)
# recipe output
response.outputs['recipe'].output_format = FORMATS.TEXT
response.outputs['recipe'].file = recipe_file
# run diag
response.update_status("running diagnostic (this could take a while)...", 20)
result = runner.run(recipe_file, config_file)
# log output
response.outputs['log'].output_format = FORMATS.TEXT
response.outputs['log'].file = result['logfile']
# debug log output
response.outputs['debug_log'].output_format = FORMATS.TEXT
response.outputs['debug_log'].file = result['debug_logfile']
response.outputs['success'].data = result['success']
if result['success']:
try:
self.get_outputs(result, response)
except Exception as e:
response.update_status("exception occured: " + str(e), 85)
else:
LOGGER.exception('esmvaltool failed!')
response.update_status("exception occured: " + result['exception'], 85)
response.update_status("creating archive of diagnostic result ...", 90)
response.outputs['archive'].output_format = Format('application/zip')
response.outputs['archive'].file = runner.compress_output(
os.path.join(self.workdir, 'output'),
os.path.join(self.workdir, 'ensemble_clustering_result.zip'))
response.update_status("done.", 100)
return response
[docs] def get_outputs(self, result, response):
# result plot
response.update_status("collecting output ...", 80)
response.outputs['plot'].output_format = Format('application/eps')
response.outputs['plot'].file = runner.get_output(result['plot_dir'],
path_filter=os.path.join('EnsClus', 'main'),
name_filter="anomalies*",
output_format="png")
response.outputs['ens_extreme'].output_format = FORMATS.NETCDF
response.outputs['ens_extreme'].file = runner.get_output(result['work_dir'],
path_filter=os.path.join('EnsClus', 'main'),
name_filter="ens_extreme*",
output_format="nc")
response.outputs['ens_climatologies'].output_format = FORMATS.NETCDF
response.outputs['ens_climatologies'].file = runner.get_output(result['work_dir'],
path_filter=os.path.join('EnsClus', 'main'),
name_filter="ens_anomalies*",
output_format="nc")
response.outputs['ens_anomalies'].output_format = FORMATS.NETCDF
response.outputs['ens_anomalies'].file = runner.get_output(result['work_dir'],
path_filter=os.path.join('EnsClus', 'main'),
name_filter="ens_anomalies*",
output_format="nc")
response.outputs['statistics'].output_format = FORMATS.TEXT
response.outputs['statistics'].file = runner.get_output(result['work_dir'],
path_filter=os.path.join('EnsClus', 'main'),
name_filter="statistics*",
output_format="txt")