Add redeploy_server processing

Adds the functionality to redeploy a server in an unguarded fashion,
meaning that the server will not be pre-validated to be in a state that
workloads have been removed.

This is the first targeted action for Shipyard, so a refactoring of the
validators to support more flexibility has been done.

Also adds RBAC controls for specific actions being created, rather than
a binary create action privilege.

Change-Id: I39e3dab6595c5187affb9f2b6edadd0f5f925b0c
This commit is contained in:
Bryan Strassner 2018-07-18 17:59:53 -05:00
parent 7d9ca0b69b
commit f3749ca3f9
36 changed files with 1463 additions and 587 deletions

View File

@ -69,13 +69,13 @@ docs: clean build_docs
.PHONY: security
security:
cd $(BUILD_CTX)/shipyard_airflow; tox -e bandit
cd $(BUILD_CTX)/shipyard_client; tox -e bandit
cd $(BUILD_CTX)/shipyard_airflow; tox -e bandit
.PHONY: tests
tests:
cd $(BUILD_CTX)/shipyard_airflow; tox
cd $(BUILD_CTX)/shipyard_client; tox
cd $(BUILD_CTX)/shipyard_airflow; tox
# Make targets intended for use by the primary targets above.
@ -130,13 +130,13 @@ clean:
rm -rf $(BUILD_DIR)/*
rm -rf build
rm -rf docs/build
cd $(BUILD_CTX)/shipyard_airflow; rm -rf build
cd $(BUILD_CTX)/shipyard_client; rm -rf build
cd $(BUILD_CTX)/shipyard_airflow; rm -rf build
.PHONY: pep8
pep8:
cd $(BUILD_CTX)/shipyard_airflow; tox -e pep8
cd $(BUILD_CTX)/shipyard_client; tox -e pep8
cd $(BUILD_CTX)/shipyard_airflow; tox -e pep8
.PHONY: helm_lint
helm_lint: clean helm-init

View File

@ -245,7 +245,7 @@ id of the action invoked so that it can be queried subsequently.
[--allow-intermediate-commits]
Example:
shipyard create action redeploy_server --param="server-name=mcp"
shipyard create action redeploy_server --param="target_nodes=mcp"
shipyard create action update_site --param="continue-on-fail=true"
<action_command>

View File

@ -63,3 +63,19 @@
# GET /api/v1.0/site_statuses
#"workflow_orchestrator:get_site_statuses": "rule:admin_required"
# Create a workflow action to deploy the site
# POST /api/v1.0/actions
#"workflow_orchestrator:action_deploy_site": "rule:admin_required"
# Create a workflow action to update the site
# POST /api/v1.0/actions
#"workflow_orchestrator:action_update_site": "rule:admin_required"
# Create a workflow action to update the site software
# POST /api/v1.0/actions
#"workflow_orchestrator:action_update_software": "rule:admin_required"
# Create a workflow action to redeploy target servers
# POST /api/v1.0/actions
#"workflow_orchestrator:action_redeploy_server": "rule:admin_required"

View File

@ -19,10 +19,47 @@
Action Commands
===============
Example invocation
------------------
API input to create an action follows this pattern, varying the name field:
Without Parmeters::
POST /v1.0/actions
{"name" : "update_site"}
With Parameters::
POST /v1.0/actions
{
"name": "redeploy_server",
"parameters": {
"target_nodes": ["node1", "node2"]
}
}
POST /v1.0/actions
{
"name": "update_site",
"parameters": {
"continue-on-fail": "true"
}
}
Analogous CLI commands::
shipyard create action update_site
shipyard create action redeploy_server --param="target_nodes=node1,node2"
shipyard create action update_site --param="continue-on-fail=true"
Supported actions
-----------------
These actions are currently supported using the Action API
These actions are currently supported using the Action API and CLI
.. _deploy_site:
@ -70,30 +107,47 @@ configuration documents. Steps, conceptually:
#. Armada build
Orchestrates Armada to configure software on the nodes as designed.
Actions under development
~~~~~~~~~~~~~~~~~~~~~~~~~
.. _redeploy_server:
These actions are under active development
redeploy_server
~~~~~~~~~~~~~~~
Using parameters to indicate which server(s) triggers a teardown and
subsequent deployment of those servers to restore them to the current
committed design.
- redeploy_server
This action is a `target action`, and does not apply the `site action`
labels to the revision of documents in Deckhand. Application of site action
labels is reserved for site actions such as `deploy_site` and `update_site`.
Using parameters to indicate which server(s) triggers a redeployment of those
servers to the last-known-good design and secrets
Like other `target actions` that will use a baremetal or Kubernetes node as
a target, the `target_nodes` parameter will be used to list the names of the
nodes that will be acted upon.
.. danger::
At this time, there are no safeguards with regard to the running workload
in place before tearing down a server and the result may be *very*
disruptive to a working site. Users are cautioned to ensure the server
being torn down is not running a critical workload.
To support controlling this, the Shipyard service allows actions to be
associated with RBAC rules. A deployment of Shipyard can restrict access
to this action to help prevent unexpected disaster.
Future actions
~~~~~~~~~~~~~~
These actions are anticipated for development
- test region
test region
Invoke site validation testing - perhaps a baseline is an invocation of all
component's exposed tests or extended health checks. This test would be used
components' exposed tests or extended health checks. This test would be used
as a preflight-style test to ensure all components are in a working state.
- test component
test component
Invoke a particular platform component to test it. This test would be
used to interrogate a particular platform component to ensure it is in a
working state, and that its own downstream dependencies are also
operational
update labels
Triggers an update to the Kubernetes node labels for specified server(s)

View File

@ -26,7 +26,7 @@ control plane life-cycle management, and is part of the `Airship`_ platform.
sampleconf
API
API-action-commands
action-commands
CLI
site-definition-documents
client-user-guide

View File

@ -63,3 +63,18 @@
# GET /api/v1.0/site_statuses
#"workflow_orchestrator:get_site_statuses": "rule:admin_required"
# Create a workflow action to deploy the site
# POST /api/v1.0/actions
#"workflow_orchestrator:action_deploy_site": "rule:admin_required"
# Create a workflow action to update the site
# POST /api/v1.0/actions
#"workflow_orchestrator:action_update_site": "rule:admin_required"
# Create a workflow action to update the site software
# POST /api/v1.0/actions
#"workflow_orchestrator:action_update_software": "rule:admin_required"
# Create a workflow action to redeploy target servers
# POST /api/v1.0/actions
#"workflow_orchestrator:action_redeploy_server": "rule:admin_required"

View File

@ -18,35 +18,37 @@ there are any validation failures.
"""
import logging
import falcon
from shipyard_airflow.common.document_validators.document_validator_manager \
import DocumentValidationManager
from shipyard_airflow.control import service_clients
from shipyard_airflow.control.validators.validate_deployment_configuration \
import ValidateDeploymentConfigurationBasic
from shipyard_airflow.control.validators.validate_deployment_configuration \
import ValidateDeploymentConfigurationFull
from shipyard_airflow.errors import ApiError
from shipyard_airflow.control.validators.validate_committed_revision import \
ValidateCommittedRevision
from shipyard_airflow.control.validators.validate_deployment_action import \
ValidateDeploymentAction
from shipyard_airflow.control.validators.validate_intermediate_commit import \
ValidateIntermediateCommit
from shipyard_airflow.control.validators.validate_target_nodes import \
ValidateTargetNodes
LOG = logging.getLogger(__name__)
def validate_site_action_full(action):
def validate_committed_revision(action, **kwargs):
"""Invokes a validation that the committed revision of site design exists
"""
validator = ValidateCommittedRevision(action=action)
validator.validate()
def validate_deployment_action_full(action, **kwargs):
"""Validates that the deployment configuration is correctly set up
Checks:
- The deployment configuration from Deckhand using the design version
- If the deployment configuration is missing, error
- The deployment strategy from the deployment configuration.
- If the deployment strategy is specified, but is missing, error.
- Check that there are no cycles in the groups
"""
validator = _SiteActionValidator(
validator = ValidateDeploymentAction(
dh_client=service_clients.deckhand_client(),
action=action,
full_validation=True
@ -54,16 +56,14 @@ def validate_site_action_full(action):
validator.validate()
def validate_site_action_basic(action):
def validate_deployment_action_basic(action, **kwargs):
"""Validates that the DeploymentConfiguration is present
Checks:
- The deployment configuration from Deckhand using the design version
- If the deployment configuration is missing, error
"""
validator = _SiteActionValidator(
validator = ValidateDeploymentAction(
dh_client=service_clients.deckhand_client(),
action=action,
full_validation=False
@ -71,72 +71,22 @@ def validate_site_action_basic(action):
validator.validate()
class _SiteActionValidator:
"""The validator object used by the validate_site_action_<x> functions
def validate_intermediate_commits(action, configdocs_helper, **kwargs):
"""Validates that intermediate commits don't exist
Prevents the execution of an action if there are intermediate commits
since the last site action. If 'allow_intermediate_commits' is set on the
action, allows the action to continue
"""
def __init__(self, dh_client, action, full_validation=True):
self.action = action
self.doc_revision = self._get_doc_revision()
self.cont_on_fail = str(self._action_param(
'continue-on-fail')).lower() == 'true'
if full_validation:
# Perform a complete validation
self.doc_val_mgr = DocumentValidationManager(
dh_client,
self.doc_revision,
[(ValidateDeploymentConfigurationFull,
'deployment-configuration')]
)
else:
# Perform a basic validation only
self.doc_val_mgr = DocumentValidationManager(
dh_client,
self.doc_revision,
[(ValidateDeploymentConfigurationBasic,
'deployment-configuration')]
)
validator = ValidateIntermediateCommit(
action=action, configdocs_helper=configdocs_helper)
validator.validate()
def validate(self):
results = self.doc_val_mgr.validate()
if self.doc_val_mgr.errored:
if self.cont_on_fail:
LOG.warn("Validation failures occured, but 'continue-on-fail' "
"is set to true. Processing continues")
else:
raise ApiError(
title='Document validation failed',
description='InvalidConfigurationDocuments',
status=falcon.HTTP_400,
error_list=results,
retry=False,
)
def _action_param(self, p_name):
"""Retrieve the value of the specified parameter or None if it doesn't
exist
"""
try:
return self.action['parameters'][p_name]
except KeyError:
return None
def validate_target_nodes(action, **kwargs):
"""Validates the target_nodes parameter
def _get_doc_revision(self):
"""Finds the revision id for the committed revision"""
doc_revision = self.action.get('committed_rev_id')
if doc_revision is None:
raise ApiError(
title='Invalid document revision',
description='InvalidDocumentRevision',
status=falcon.HTTP_400,
error_list=[{
'message': (
'Action {} with id {} was unable to find a valid '
'committed document revision'.format(
self.action.get('name'),
self.action.get('id')
)
)
}],
retry=False,
)
return doc_revision
Ensures the target_nodes is present and properly specified.
"""
validator = ValidateTargetNodes(action=action)
validator.validate()

View File

@ -45,19 +45,39 @@ def _action_mappings():
return {
'deploy_site': {
'dag': 'deploy_site',
'validators': [action_validators.validate_site_action_full]
'rbac_policy': policy.ACTION_DEPLOY_SITE,
'validators': [
action_validators.validate_committed_revision,
action_validators.validate_intermediate_commits,
action_validators.validate_deployment_action_full,
]
},
'update_site': {
'dag': 'update_site',
'validators': [action_validators.validate_site_action_full]
'rbac_policy': policy.ACTION_UPDATE_SITE,
'validators': [
action_validators.validate_committed_revision,
action_validators.validate_intermediate_commits,
action_validators.validate_deployment_action_full,
]
},
'update_software': {
'dag': 'update_software',
'validators': [action_validators.validate_site_action_basic]
'rbac_policy': policy.ACTION_UPDATE_SOFTWARE,
'validators': [
action_validators.validate_committed_revision,
action_validators.validate_intermediate_commits,
action_validators.validate_deployment_action_basic,
]
},
'redeploy_server': {
'dag': 'redeploy_server',
'validators': []
'rbac_policy': policy.ACTION_REDEPLOY_SERVER,
'validators': [
action_validators.validate_target_nodes,
action_validators.validate_committed_revision,
action_validators.validate_deployment_action_basic,
]
}
}
@ -100,7 +120,6 @@ class ActionsResource(BaseResource):
resp.location = '/api/v1.0/actions/{}'.format(action['id'])
def create_action(self, action, context, allow_intermediate_commits=False):
action_mappings = _action_mappings()
# use uuid assigned for this request as the id of the action.
action['id'] = ulid.ulid()
# the invoking user
@ -109,12 +128,18 @@ class ActionsResource(BaseResource):
action['timestamp'] = str(datetime.utcnow())
# validate that action is supported.
LOG.info("Attempting action: %s", action['name'])
action_mappings = _action_mappings()
if action['name'] not in action_mappings:
raise ApiError(
title='Unable to start action',
description='Unsupported Action: {}'.format(action['name']))
dag = action_mappings.get(action['name'])['dag']
action_cfg = action_mappings.get(action['name'])
# check access to specific actions - lack of access will exception out
policy.check_auth(context, action_cfg['rbac_policy'])
dag = action_cfg['dag']
action['dag_id'] = dag
# Set up configdocs_helper
@ -122,18 +147,19 @@ class ActionsResource(BaseResource):
# Retrieve last committed design revision
action['committed_rev_id'] = self.get_committed_design_version()
# Check for intermediate commit
self.check_intermediate_commit_revision(allow_intermediate_commits)
# Set if intermediate commits are ignored
action['allow_intermediate_commits'] = allow_intermediate_commits
# populate action parameters if they are not set
if 'parameters' not in action:
action['parameters'] = {}
# validate if there is any validation to do
for validator in action_mappings.get(action['name'])['validators']:
# validators will raise ApiError if they are not validated.
validator(action)
for validator in action_cfg['validators']:
# validators will raise ApiError if they fail validation.
# validators are expected to accept action as a parameter, but
# handle all other kwargs (e.g. def vdtr(action, **kwargs): even if
# they don't use that parameter.
validator(action=action, configdocs_helper=self.configdocs_helper)
# invoke airflow, get the dag's date
dag_execution_date = self.invoke_airflow_dag(
@ -347,43 +373,16 @@ class ActionsResource(BaseResource):
)
def get_committed_design_version(self):
"""Retrieves the committed design version from Deckhand.
LOG.info("Checking for committed revision in Deckhand...")
Returns None if there is no committed version
"""
committed_rev_id = self.configdocs_helper.get_revision_id(
configdocs_helper.COMMITTED
)
if committed_rev_id:
LOG.info("The committed revision in Deckhand is %d",
committed_rev_id)
return committed_rev_id
else:
raise ApiError(
title='Unable to locate any committed revision in Deckhand',
description='No committed version found in Deckhand',
status=falcon.HTTP_404,
retry=False)
def check_intermediate_commit_revision(self,
allow_intermediate_commits=False):
LOG.info("Checking for intermediate committed revision in Deckhand...")
intermediate_commits = (
self.configdocs_helper.check_intermediate_commit())
if intermediate_commits and not allow_intermediate_commits:
raise ApiError(
title='Intermediate Commit Detected',
description=(
'The current committed revision of documents has '
'other prior commits that have not been used as '
'part of a site action, e.g. update_site. If you '
'are aware and these other commits are intended, '
'please rerun this action with the option '
'`allow-intermediate-commits=True`'),
status=falcon.HTTP_409,
retry=False
)
LOG.info("No committed revision found in Deckhand")
return None

View File

@ -0,0 +1,41 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import falcon
from shipyard_airflow.errors import ApiError
LOG = logging.getLogger(__name__)
class ValidateCommittedRevision:
"""Validate that the committed revision was found in Deckhand
Does not perform the actual lookup - only validates that the action has
the value populated with a valid value other than `None`
"""
def __init__(self, action):
self.action = action
def validate(self):
if self.action.get('committed_rev_id') is None:
raise ApiError(
title='No committed configdocs',
description=(
'Unable to locate a committed revision in Deckhand'
),
status=falcon.HTTP_400,
retry=False
)

View File

@ -0,0 +1,76 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import falcon
from .validate_deployment_configuration \
import ValidateDeploymentConfigurationBasic
from .validate_deployment_configuration \
import ValidateDeploymentConfigurationFull
from shipyard_airflow.common.document_validators.document_validator_manager \
import DocumentValidationManager
from shipyard_airflow.errors import ApiError
LOG = logging.getLogger(__name__)
class ValidateDeploymentAction:
"""The validator used by the validate_deployment_action_<x> functions
"""
def __init__(self, dh_client, action, full_validation=True):
self.action = action
self.doc_revision = self.action.get('committed_rev_id')
self.cont_on_fail = str(self._action_param(
'continue-on-fail')).lower() == 'true'
if full_validation:
# Perform a complete validation
self.doc_val_mgr = DocumentValidationManager(
dh_client,
self.doc_revision,
[(ValidateDeploymentConfigurationFull,
'deployment-configuration')]
)
else:
# Perform a basic validation only
self.doc_val_mgr = DocumentValidationManager(
dh_client,
self.doc_revision,
[(ValidateDeploymentConfigurationBasic,
'deployment-configuration')]
)
def validate(self):
results = self.doc_val_mgr.validate()
if self.doc_val_mgr.errored:
if self.cont_on_fail:
LOG.warn("Validation failures occured, but 'continue-on-fail' "
"is set to true. Processing continues")
else:
raise ApiError(
title='Document validation failed',
description='InvalidConfigurationDocuments',
status=falcon.HTTP_400,
error_list=results,
retry=False,
)
def _action_param(self, p_name):
"""Retrieve the value of the specified parameter or None if it doesn't
exist
"""
try:
return self.action['parameters'][p_name]
except KeyError:
return None

View File

@ -0,0 +1,51 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import falcon
from shipyard_airflow.errors import ApiError
LOG = logging.getLogger(__name__)
class ValidateIntermediateCommit:
"""Validtor to ensure that intermediate commits are not present
If allow_intermediate_commits is set on the action, this validator will
not check.
"""
def __init__(self, action, configdocs_helper):
self.action = action
self.configdocs_helper = configdocs_helper
def validate(self):
if self.action.get('allow_intermediate_commits'):
LOG.debug("Intermediate commit check skipped due to user input")
else:
intermediate_commits = (
self.configdocs_helper.check_intermediate_commit())
if intermediate_commits:
raise ApiError(
title='Intermediate commit detected',
description=(
'The current committed revision of documents has '
'other prior commits that have not been used as '
'part of a site action, e.g. update_site. If you '
'are aware and these other commits are intended, '
'please rerun this action with the option '
'`allow-intermediate-commits=True`'),
status=falcon.HTTP_409,
retry=False
)

View File

@ -0,0 +1,66 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import falcon
from shipyard_airflow.errors import ApiError
LOG = logging.getLogger(__name__)
class ValidateTargetNodes:
"""Validate that the target_nodes parameter has values in it
For actions that target nodes, this parameter must have at least one value
in it, and each value should be a string
"""
def __init__(self, action):
self.action = action
def validate(self):
parameters = self.action.get('parameters')
valid = parameters is not None
if valid:
# target_nodes parameter should exist
nodes = parameters.get('target_nodes')
valid = nodes is not None
if valid:
# should be able to listify the nodes
try:
node_list = list(nodes)
valid = len(node_list) > 0
except TypeError:
valid = False
if valid:
# each entry should be a string
for s in node_list:
if not isinstance(s, str):
valid = False
break
if valid:
# all valid
return
# something was invalid
raise ApiError(
title='Invalid target_nodes parameter',
description=(
'The target_nodes parameter for this action '
'should be a list with one or more string values '
'representing node names'
),
status=falcon.HTTP_400,
retry=False
)

View File

@ -23,6 +23,7 @@ try:
from airflow.operators import DeckhandRetrieveRenderedDocOperator
from airflow.operators import DeploymentConfigurationOperator
from airflow.operators import DeckhandCreateSiteActionTagOperator
from airflow.operators import DrydockDestroyNodeOperator
except ImportError:
# for local testing, they are loaded from their source directory
from shipyard_airflow.plugins.concurrency_check_operator import \
@ -33,6 +34,8 @@ except ImportError:
DeploymentConfigurationOperator
from shipyard_airflow.plugins.deckhand_create_site_action_tag import \
DeckhandCreateSiteActionTagOperator
from shipyard_airflow.plugins.drydock_destroy_nodes import \
DrydockDestroyNodeOperator
try:
# modules reside in a flat directory when deployed with dags
@ -61,14 +64,22 @@ class CommonStepFactory(object):
A factory to generate steps that are reused among multiple dags
"""
def __init__(self, parent_dag_name, dag, default_args):
def __init__(self, parent_dag_name, dag, default_args, action_type):
"""Creates a factory
Uses the specified parent_dag_name
:param parent_dag_name: the name of the base DAG that this step
factory will service
:param dag: the dag object
:param default_args: the default args from the dag that will be used
by steps in lieu of overridden values.
:action_type: defines the type of action - site, targeted, possibly
others that will be stored on xcom if the action_xcom step is used.
This can then be used to drive behavior in later steps.
"""
self.parent_dag_name = parent_dag_name
self.dag = dag
self.default_args = default_args
self.action_type = action_type or 'default'
def get_action_xcom(self, task_id=dn.ACTION_XCOM):
"""Generate the action_xcom step
@ -81,11 +92,13 @@ class CommonStepFactory(object):
Defines a push function to store the content of 'action' that is
defined via 'dag_run' in XCOM so that it can be used by the
Operators
Operators. Includes action-related information for later steps.
"""
kwargs['ti'].xcom_push(key='action',
value=kwargs['dag_run'].conf['action'])
kwargs['ti'].xcom_push(key='action_type',
value=self.action_type)
return PythonOperator(task_id=task_id,
dag=self.dag,
@ -189,6 +202,21 @@ class CommonStepFactory(object):
on_failure_callback=step_failure_handler,
dag=self.dag)
def get_unguarded_destroy_servers(self, task_id=dn.DESTROY_SERVER):
"""Generates an unguarded destroy server step.
This version of destroying servers does no pre-validations or extra
shutdowns of anything. It unconditionally triggers Drydock to destroy
the server. The counterpart to this step is the subdag returned by the
get_destroy_server method below.
"""
return DrydockDestroyNodeOperator(
shipyard_conf=config_path,
main_dag_name=self.parent_dag_name,
task_id=task_id,
on_failure_callback=step_failure_handler,
dag=self.dag)
def get_destroy_server(self, task_id=dn.DESTROY_SERVER_DAG_NAME):
"""Generate a destroy server step

View File

@ -28,3 +28,4 @@ DEPLOYMENT_CONFIGURATION = 'deployment_configuration'
GET_RENDERED_DOC = 'get_rendered_doc'
SKIP_UPGRADE_AIRFLOW = 'skip_upgrade_airflow'
UPGRADE_AIRFLOW = 'upgrade_airflow'
DESTROY_SERVER = 'destroy_nodes'

View File

@ -45,7 +45,8 @@ dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None)
step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME,
dag=dag,
default_args=default_args)
default_args=default_args,
action_type='site')
action_xcom = step_factory.get_action_xcom()
concurrency_check = step_factory.get_concurrency_check()

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.models import DAG
try:

View File

@ -18,13 +18,14 @@ from airflow import DAG
try:
from common_step_factory import CommonStepFactory
from validate_site_design import BAREMETAL
except ImportError:
from shipyard_airflow.dags.common_step_factory import CommonStepFactory
from shipyard_airflow.dags.validate_site_design import BAREMETAL
"""redeploy_server
The top-level orchestration DAG for redeploying a server using the Undercloud
platform.
The top-level orchestration DAG for redeploying server(s).
"""
PARENT_DAG_NAME = 'redeploy_server'
@ -45,23 +46,29 @@ dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None)
step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME,
dag=dag,
default_args=default_args)
default_args=default_args,
action_type='targeted')
action_xcom = step_factory.get_action_xcom()
concurrency_check = step_factory.get_concurrency_check()
preflight = step_factory.get_preflight()
get_rendered_doc = step_factory.get_get_rendered_doc()
deployment_configuration = step_factory.get_deployment_configuration()
validate_site_design = step_factory.get_validate_site_design()
destroy_server = step_factory.get_destroy_server()
validate_site_design = step_factory.get_validate_site_design(
targets=[BAREMETAL]
)
# TODO(bryan-strassner): When the rest of the necessary functionality is in
# place, this step may need to be replaced with the guarded version of
# destroying servers.
# For now, this is the unguarded action, which will tear down the server
# without concern for any workload.
destroy_server = step_factory.get_unguarded_destroy_servers()
drydock_build = step_factory.get_drydock_build()
# DAG Wiring
concurrency_check.set_upstream(action_xcom)
preflight.set_upstream(concurrency_check)
get_rendered_doc.set_upstream(preflight)
deployment_configuration.set_upstream(get_rendered_doc)
validate_site_design.set_upstream(deployment_configuration)
deployment_configuration.set_upstream(action_xcom)
validate_site_design.set_upstream([
concurrency_check,
deployment_configuration
])
destroy_server.set_upstream(validate_site_design)
drydock_build.set_upstream(destroy_server)

View File

@ -49,7 +49,8 @@ dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None)
step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME,
dag=dag,
default_args=default_args)
default_args=default_args,
action_type='site')
action_xcom = step_factory.get_action_xcom()
concurrency_check = step_factory.get_concurrency_check()

View File

@ -46,7 +46,8 @@ dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None)
step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME,
dag=dag,
default_args=default_args)
default_args=default_args,
action_type='site')
action_xcom = step_factory.get_action_xcom()
concurrency_check = step_factory.get_concurrency_check()

View File

@ -81,7 +81,7 @@ class ArmadaBaseOperator(UcpBaseOperator):
self.xcom_pusher = XcomPusher(self.task_instance)
# Logs uuid of action performed by the Operator
LOG.info("Armada Operator for action %s", self.action_info['id'])
LOG.info("Armada Operator for action %s", self.action_id)
# Set up armada client
self.armada_client = self._init_armada_client(

View File

@ -93,7 +93,7 @@ class DeckhandBaseOperator(UcpBaseOperator):
# Logs uuid of Shipyard action
LOG.info("Executing Shipyard Action %s",
self.action_info['id'])
self.action_id)
# Retrieve Endpoint Information
self.deckhand_svc_endpoint = self.endpoints.endpoint_by_name(

View File

@ -17,7 +17,6 @@ import logging
import time
from urllib.parse import urlparse
from airflow.exceptions import AirflowException
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
@ -51,13 +50,11 @@ LOG = logging.getLogger(__name__)
class DrydockBaseOperator(UcpBaseOperator):
"""Drydock Base Operator
All drydock related workflow operators will use the drydock
base operator as the parent and inherit attributes and methods
from this class
"""
@apply_defaults
@ -85,7 +82,6 @@ class DrydockBaseOperator(UcpBaseOperator):
the action and the deployment configuration
"""
super(DrydockBaseOperator,
self).__init__(
pod_selector_pattern=[{'pod_pattern': 'drydock-api',
@ -97,40 +93,36 @@ class DrydockBaseOperator(UcpBaseOperator):
self.redeploy_server = redeploy_server
self.svc_session = svc_session
self.svc_token = svc_token
self.target_nodes = None
def run_base(self, context):
"""Base setup/processing for Drydock operators
# Logs uuid of action performed by the Operator
LOG.info("DryDock Operator for action %s", self.action_info['id'])
:param context: the context supplied by the dag_run in Airflow
"""
LOG.debug("Drydock Operator for action %s", self.action_id)
# if continue processing is false, don't bother setting up things.
if self._continue_processing_flag():
self._setup_drydock_client()
# Skip workflow if health checks on Drydock failed and continue-on-fail
# option is turned on
def _continue_processing_flag(self):
"""Checks if this processing should continue or not
Skip workflow if health checks on Drydock failed and continue-on-fail
option is turned on.
Returns the self.continue_processing value.
"""
if self.xcom_puller.get_check_drydock_continue_on_fail():
LOG.info("Skipping %s as health checks on Drydock have "
"failed and continue-on-fail option has been "
"turned on", self.__class__.__name__)
# Set continue processing to False
self.continue_processing = False
return
# Retrieve information of the server that we want to redeploy if user
# executes the 'redeploy_server' dag
# Set node filter to be the server that we want to redeploy
if self.action_info['dag_id'] == 'redeploy_server':
self.redeploy_server = (
self.action_info['parameters']['server-name'])
if self.redeploy_server:
LOG.info("Server to be redeployed is %s",
self.redeploy_server)
self.node_filter = self.redeploy_server
else:
raise AirflowException('%s was unable to retrieve the '
'server to be redeployed.'
% self.__class__.__name__)
return self.continue_processing
def _setup_drydock_client(self):
"""Setup the drydock client for use by this operator"""
# Retrieve Endpoint Information
self.drydock_svc_endpoint = self.endpoints.endpoint_by_name(
service_endpoint.DRYDOCK
@ -145,31 +137,25 @@ class DrydockBaseOperator(UcpBaseOperator):
# information.
# The DrydockSession will care for TCP connection pooling
# and header management
LOG.info("Build DryDock Session")
dd_session = session.DrydockSession(drydock_url.hostname,
port=drydock_url.port,
auth_gen=self._auth_gen)
# Raise Exception if we are not able to set up the session
if dd_session:
LOG.info("Successfully Set Up DryDock Session")
else:
if not dd_session:
raise DrydockClientUseFailureException(
"Failed to set up Drydock Session!"
)
# Use the DrydockSession to build a DrydockClient that can
# be used to make one or more API calls
LOG.info("Create DryDock Client")
self.drydock_client = client.DrydockClient(dd_session)
# Raise Exception if we are not able to build the client
if self.drydock_client:
LOG.info("Successfully Set Up DryDock client")
else:
if not self.drydock_client:
raise DrydockClientUseFailureException(
"Failed to set up Drydock Client!"
)
LOG.info("Drydock Session and Client etablished.")
@shipyard_service_token
def _auth_gen(self):
@ -376,6 +362,115 @@ class DrydockBaseOperator(UcpBaseOperator):
"Unable to retrieve subtask info!"
)
def get_successes_for_task(self, task_id, extend_success=True):
"""Discover the successful nodes based on the current task id.
:param task_id: The id of the task
:param extend_successes: determines if this result extends successes
or simply reports on the task.
Gets the set of successful nodes by examining the self.drydock_task_id.
The children are traversed recursively to display each sub-task's
information.
Only a reported success at the parent task indicates success of the
task. Drydock is assumed to roll up overall success to the top level.
"""
success_nodes = []
try:
task_dict = self.get_task_dict(task_id)
task_status = task_dict.get('status', "Not Specified")
task_result = task_dict.get('result')
if task_result is None:
LOG.warn("Task result is missing for task %s, with status %s."
" Neither successes nor further details can be"
" extracted from this result",
task_id, task_status)
else:
if extend_success:
try:
# successes and failures on the task result drive the
# interpretation of success or failure for this
# workflow.
# - Any node that is _only_ success for a task is a
# success to us.
# - Any node that is listed as a failure is a failure.
# This implies that a node listed as a success and a
# failure is a failure. E.g. some subtasks succeeded
# and some failed
t_successes = task_result.get('successes', [])
t_failures = task_result.get('failures', [])
actual_successes = set(t_successes) - set(t_failures)
# acquire the successes from success nodes
success_nodes.extend(actual_successes)
LOG.info("Nodes <%s> added as successes for task %s",
", ".join(success_nodes), task_id)
except KeyError:
# missing key on the path to getting nodes - don't add
LOG.warn(
"Missing successes field on result of task %s, "
"but a success field was expected. No successes"
" can be extracted from this result", task_id
)
pass
_report_task_info(task_id, task_result, task_status)
# for each child, report only the step info, do not add to overall
# success list.
for ch_task_id in task_dict.get('subtask_id_list', []):
success_nodes.extend(
self.get_successes_for_task(ch_task_id,
extend_success=False)
)
except Exception:
# since we are reporting task results, if we can't get the
# results, do not block the processing.
LOG.warn("Failed to retrieve a result for task %s. Exception "
"follows:", task_id, exc_info=True)
# deduplicate and return
return set(success_nodes)
def gen_node_name_filter(node_names):
"""Generates a drydock compatible node filter using only node names
:param node_names: the nodes with which to create a filter
"""
return {
'filter_set_type': 'union',
'filter_set': [
{
'filter_type': 'union',
'node_names': node_names
}
]
}
def _report_task_info(task_id, task_result, task_status):
"""Logs information regarding a task.
:param task_id: id of the task
:param task_result: The result dictionary of the task
:param task_status: The status for the task
"""
# setup fields, or defaults if missing values
task_failures = task_result.get('failures', [])
task_successes = task_result.get('successes', [])
result_details = task_result.get('details', {'messageList': []})
result_status = task_result.get('status', "No status supplied")
LOG.info("Task %s with status %s/%s reports successes: [%s] and"
" failures: [%s]", task_id, task_status, result_status,
", ".join(task_successes), ", ".join(task_failures))
for message_item in result_details['messageList']:
context_type = message_item.get('context_type', 'N/A')
context_id = message_item.get('context', 'N/A')
message = message_item.get('message', "No message text supplied")
error = message_item.get('error', False)
timestamp = message_item.get('ts', 'No timestamp supplied')
LOG.info(" - Task %s for item %s:%s has message: %s [err=%s, at %s]",
task_id, context_type, context_id, message, error, timestamp)
class DrydockBaseOperatorPlugin(AirflowPlugin):

View File

@ -11,38 +11,91 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Invoke the Drydock steps for destroying a node."""
import logging
import time
from airflow.exceptions import AirflowException
from airflow.plugins_manager import AirflowPlugin
try:
from drydock_base_operator import DrydockBaseOperator
from drydock_base_operator import gen_node_name_filter
from drydock_errors import (
DrydockTaskFailedException,
DrydockTaskTimeoutException
)
except ImportError:
from shipyard_airflow.plugins.drydock_base_operator import \
DrydockBaseOperator
from shipyard_airflow.plugins.drydock_base_operator import \
gen_node_name_filter
from shipyard_airflow.plugins.drydock_errors import (
DrydockTaskFailedException,
DrydockTaskTimeoutException
)
LOG = logging.getLogger(__name__)
class DrydockDestroyNodeOperator(DrydockBaseOperator):
"""Drydock Destroy Node Operator
This operator will trigger drydock to destroy a bare metal
node
"""
def do_execute(self):
self.successes = []
# NOTE: This is a PlaceHolder function. The 'destroy_node'
# functionalities in DryDock is being worked on and is not
# ready at the moment.
LOG.info("Destroying node %s from cluster...",
self.redeploy_server)
time.sleep(15)
LOG.info("Successfully deleted node %s", self.redeploy_server)
LOG.info("Destroying nodes [%s]", ", ".join(self.target_nodes))
self.setup_configured_values()
self.node_filter = gen_node_name_filter(self.target_nodes)
self.execute_destroy()
self.successes = self.get_successes_for_task(self.drydock_task_id)
self.report_summary()
if not self.is_destroy_successful():
raise AirflowException(
"One or more nodes requested for destruction failed to destroy"
)
def setup_configured_values(self):
"""Retrieve and localize the interval and timeout values for destroy
"""
self.dest_interval = self.dc['physical_provisioner.destroy_interval']
self.dest_timeout = self.dc['physical_provisioner.destroy_timeout']
def execute_destroy(self):
"""Run the task to destroy the nodes specified in the node_filter
:param node_filter: The Drydock node filter with the nodes to destroy
"""
task_name = 'destroy_nodes'
self.create_task(task_name)
try:
self.query_task(self.dest_interval, self.dest_timeout)
except DrydockTaskFailedException:
LOG.exception("Task %s has failed. Some nodes may have been "
"destroyed. The report at the end of processing "
"this step contains the results", task_name)
except DrydockTaskTimeoutException:
LOG.warn("Task %s has timed out after %s seconds. Some nodes may "
"have been destroyed. The report at the end of "
"processing this step contains the results", task_name,
self.dest_timeout)
def report_summary(self):
"""Reports the successfully destroyed nodes"""
failed = list(set(self.target_nodes) - set(self.successes))
LOG.info("===== Destroy Nodes Summary =====")
LOG.info(" Nodes requested: %s", ", ".join(sorted(self.target_nodes)))
LOG.info(" Nodes destroyed: %s ", ", ".join(sorted(self.successes)))
LOG.info(" Nodes not destroyed: %s", ", ".join(sorted(failed)))
LOG.info("===== End Destroy Nodes Summary =====")
def is_destroy_successful(self):
"""Boolean if the destroy nodes was completely succesful."""
failed = set(self.target_nodes) - set(self.successes)
return not failed
class DrydockDestroyNodeOperatorPlugin(AirflowPlugin):

View File

@ -36,6 +36,7 @@ from shipyard_airflow.common.deployment_group.node_lookup import NodeLookup
try:
import check_k8s_node_status
from drydock_base_operator import DrydockBaseOperator
from drydock_base_operator import gen_node_name_filter
from drydock_errors import (
DrydockTaskFailedException,
DrydockTaskTimeoutException
@ -44,6 +45,8 @@ except ImportError:
from shipyard_airflow.plugins import check_k8s_node_status
from shipyard_airflow.plugins.drydock_base_operator import \
DrydockBaseOperator
from shipyard_airflow.plugins.drydock_base_operator import \
gen_node_name_filter
from shipyard_airflow.plugins.drydock_errors import (
DrydockTaskFailedException,
DrydockTaskTimeoutException
@ -61,9 +64,8 @@ class DrydockNodesOperator(DrydockBaseOperator):
def do_execute(self):
self._setup_configured_values()
# setup self.strat_name and self.strategy
self.strategy = {}
self._setup_deployment_strategy()
# setup self.strategy
self.strategy = self.get_deployment_strategy()
dgm = _get_deployment_group_manager(
self.strategy['groups'],
_get_node_lookup(self.drydock_client, self.design_ref)
@ -119,7 +121,7 @@ class DrydockNodesOperator(DrydockBaseOperator):
"""
LOG.info("Group %s is preparing nodes", group.name)
self.node_filter = _gen_node_name_filter(group.actionable_nodes)
self.node_filter = gen_node_name_filter(group.actionable_nodes)
return self._execute_task('prepare_nodes',
self.prep_interval,
self.prep_timeout)
@ -132,7 +134,7 @@ class DrydockNodesOperator(DrydockBaseOperator):
"""
LOG.info("Group %s is deploying nodes", group.name)
self.node_filter = _gen_node_name_filter(group.actionable_nodes)
self.node_filter = gen_node_name_filter(group.actionable_nodes)
task_result = self._execute_task('deploy_nodes',
self.dep_interval,
self.dep_timeout)
@ -223,103 +225,76 @@ class DrydockNodesOperator(DrydockBaseOperator):
# Other AirflowExceptions will fail the whole task - let them do this.
# find successes
result.successes = self._get_successes_for_task(self.drydock_task_id)
result.successes = self.get_successes_for_task(self.drydock_task_id)
return result
def _get_successes_for_task(self, task_id, extend_success=True):
"""Discover the successful nodes based on the current task id.
:param task_id: The id of the task
:param extend_successes: determines if this result extends successes
or simply reports on the task.
Gets the set of successful nodes by examining the self.drydock_task_id.
The children are traversed recursively to display each sub-task's
information.
Only a reported success at the parent task indicates success of the
task. Drydock is assumed to roll up overall success to the top level.
"""
success_nodes = []
try:
task_dict = self.get_task_dict(task_id)
task_status = task_dict.get('status', "Not Specified")
task_result = task_dict.get('result')
if task_result is None:
LOG.warn("Task result is missing for task %s, with status %s."
" Neither successes nor further details can be"
" extracted from this result",
task_id, task_status)
else:
if extend_success:
try:
# successes and failures on the task result drive the
# interpretation of success or failure for this
# workflow.
# - Any node that is _only_ success for a task is a
# success to us.
# - Any node that is listed as a failure is a failure.
# This implies that a node listed as a success and a
# failure is a failure. E.g. some subtasks succeeded
# and some failed
t_successes = task_result.get('successes', [])
t_failures = task_result.get('failures', [])
actual_successes = set(t_successes) - set(t_failures)
# acquire the successes from success nodes
success_nodes.extend(actual_successes)
LOG.info("Nodes <%s> added as successes for task %s",
", ".join(success_nodes), task_id)
except KeyError:
# missing key on the path to getting nodes - don't add
LOG.warn(
"Missing successes field on result of task %s, "
"but a success field was expected. No successes"
" can be extracted from this result", task_id
)
pass
_report_task_info(task_id, task_result, task_status)
# for each child, report only the step info, do not add to overall
# success list.
for ch_task_id in task_dict.get('subtask_id_list', []):
success_nodes.extend(
self._get_successes_for_task(ch_task_id,
extend_success=False)
)
except Exception:
# since we are reporting task results, if we can't get the
# results, do not block the processing.
LOG.warn("Failed to retrieve a result for task %s. Exception "
"follows:", task_id, exc_info=True)
# deduplicate and return
return set(success_nodes)
def _setup_deployment_strategy(self):
def get_deployment_strategy(self):
"""Determine the deployment strategy
Uses the specified strategy from the deployment configuration
or returns a default configuration of 'all-at-once'
"""
self.strat_name = self.dc['physical_provisioner.deployment_strategy']
if self.strat_name:
# if there is a deployment strategy specified, get it and use it
self.strategy = self.get_unique_doc(
name=self.strat_name,
schema="shipyard/DeploymentStrategy/v1"
)
if self.target_nodes:
# Set up a strategy with one group with the list of nodes, so those
# nodes are the only nodes processed.
LOG.info("Seting up deployment strategy using targeted nodes")
strat_name = 'targeted nodes'
strategy = gen_simple_deployment_strategy(name='target-group',
nodes=self.target_nodes)
else:
# The default behavior is to deploy all nodes, and fail if
# any nodes fail to deploy.
self.strat_name = 'all-at-once (defaulted)'
self.strategy = _default_deployment_strategy()
# Otherwise, do a strategy for the site - either from the
# configdocs or a default "everything".
strat_name = self.dc['physical_provisioner.deployment_strategy']
if strat_name:
# if there is a deployment strategy specified, use it
strategy = self.get_unique_doc(
name=strat_name,
schema="shipyard/DeploymentStrategy/v1"
)
else:
# The default behavior is to deploy all nodes, and fail if
# any nodes fail to deploy.
strat_name = 'all-at-once (defaulted)'
strategy = gen_simple_deployment_strategy()
LOG.info("Strategy Name: %s has %s groups",
self.strat_name,
len(self.strategy.get('groups', [])))
strat_name,
len(strategy.get('groups', [])))
return strategy
#
# Functions supporting the nodes operator class
#
def gen_simple_deployment_strategy(name=None, nodes=None):
"""Generates a single group deployment strategy
:param name: the name of the single group. Defaults to 'default'
:param nodes: the list of node_names to be used. Defaults to []
"""
target_name = name or 'default'
target_nodes = list(nodes) if nodes else []
return {
'groups': [
{
'name': target_name,
'critical': True,
'depends_on': [],
'selectors': [
{
'node_names': target_nodes,
'node_labels': [],
'node_tags': [],
'rack_names': [],
},
],
'success_criteria': {
'percent_successful_nodes': 100
},
}
]
}
def _get_node_lookup(drydock_client, design_ref):
"""Return a NodeLookup suitable for the DeploymentGroupManager
@ -409,71 +384,6 @@ def _process_deployment_groups(dgm, prepare_func, deploy_func):
dgm.evaluate_group_succ_criteria(group.name, Stage.DEPLOYED)
def _report_task_info(task_id, task_result, task_status):
"""Logs information regarding a task.
:param task_id: id of the task
:param task_result: The result dictionary of the task
:param task_status: The status for the task
"""
# setup fields, or defaults if missing values
task_failures = task_result.get('failures', [])
task_successes = task_result.get('successes', [])
result_details = task_result.get('details', {'messageList': []})
result_status = task_result.get('status', "No status supplied")
LOG.info("Task %s with status %s/%s reports successes: [%s] and"
" failures: [%s]", task_id, task_status, result_status,
", ".join(task_successes), ", ".join(task_failures))
for message_item in result_details['messageList']:
context_type = message_item.get('context_type', 'N/A')
context_id = message_item.get('context', 'N/A')
message = message_item.get('message', "No message text supplied")
error = message_item.get('error', False)
timestamp = message_item.get('ts', 'No timestamp supplied')
LOG.info(" - Task %s for item %s:%s has message: %s [err=%s, at %s]",
task_id, context_type, context_id, message, error, timestamp)
def _default_deployment_strategy():
"""The default deployment strategy for 'all-at-once'"""
return {
'groups': [
{
'name': 'default',
'critical': True,
'depends_on': [],
'selectors': [
{
'node_names': [],
'node_labels': [],
'node_tags': [],
'rack_names': [],
},
],
'success_criteria': {
'percent_successful_nodes': 100
},
}
]
}
def _gen_node_name_filter(node_names):
"""Generates a drydock compatible node filter using only node names
:param node_names: the nodes with which to create a filter
"""
return {
'filter_set_type': 'union',
'filter_set': [
{
'filter_type': 'union',
'node_names': node_names
}
]
}
class QueryTaskResult:
"""Represents a summarized query result from a task"""
def __init__(self, task_id, task_name):

View File

@ -13,9 +13,8 @@
# limitations under the License.
import logging
from airflow.utils.decorators import apply_defaults
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from airflow.utils.decorators import apply_defaults
try:
import service_endpoint
@ -63,20 +62,7 @@ class PromenadeBaseOperator(UcpBaseOperator):
def run_base(self, context):
# Logs uuid of Shipyard action
LOG.info("Executing Shipyard Action %s", self.action_info['id'])
# Retrieve information of the server that we want to redeploy
# if user executes the 'redeploy_server' dag
if self.action_info['dag_id'] == 'redeploy_server':
self.redeploy_server = self.action_info['parameters'].get(
'server-name')
if self.redeploy_server:
LOG.info("Server to be redeployed is %s", self.redeploy_server)
else:
raise AirflowException('%s was unable to retrieve the '
'server to be redeployed.'
% self.__class__.__name__)
LOG.info("Executing Shipyard Action %s", self.action_id)
# Retrieve promenade endpoint
self.promenade_svc_endpoint = self.endpoints.endpoint_by_name(

View File

@ -122,9 +122,15 @@ class UcpBaseOperator(BaseOperator):
# Set up and retrieve values from xcom
self.xcom_puller = XcomPuller(self.main_dag_name, self.task_instance)
self.action_info = self.xcom_puller.get_action_info()
self.action_type = self.xcom_puller.get_action_type()
self.dc = self.xcom_puller.get_deployment_configuration()
# Set up other common-use values
self.action_id = self.action_info['id']
self.revision_id = self.action_info['committed_rev_id']
self.action_params = self.action_info.get('parameters', {})
self.design_ref = self._deckhand_design_ref()
self._setup_target_nodes()
def get_k8s_logs(self):
"""Retrieve Kubernetes pod/container logs specified by an opererator
@ -155,6 +161,35 @@ class UcpBaseOperator(BaseOperator):
else:
LOG.debug("There are no pod logs specified to retrieve")
def _setup_target_nodes(self):
"""Sets up the target nodes field for this action
When managing a targeted action, this step needs to resolve the
target node. If there are no targets found (should be caught before
invocation of the DAG), then raise an exception so that it does not
try to take action on more nodes than targeted.
Later, when creating the deployment group, if this value
(self.target_nodes) is set, it will be used in lieu of the design
based deployment strategy.
target_nodes will be a comma separated string provided as part of the
parameters to an action on input to Shipyard.
"""
if self.action_type == 'targeted':
t_nodes = self.action_params.get('target_nodes', '')
self.target_nodes = [n.strip() for n in t_nodes.split(',')]
if not self.target_nodes:
raise AirflowException(
'{} ({}) requires targeted nodes, but was unable to '
'resolve any targets in {}'.format(
self.main_dag_name, self.action_id,
self.__class__.__name__
)
)
LOG.info("Target Nodes for action: [%s]",
', '.join(self.target_nodes))
else:
self.target_nodes = None
def _deckhand_design_ref(self):
"""Assemble a deckhand design_ref"""
# Retrieve DeckHand Endpoint Information

View File

@ -74,7 +74,7 @@ class XcomPuller(object):
key=key)
def get_action_info(self):
"""Retrive the action and action parameter info dictionary
"""Retrieve the action and action parameter info dictionary
Extract information related to current workflow. This is a dictionary
that contains information about the workflow such as action_id, name
@ -87,6 +87,15 @@ class XcomPuller(object):
dag_id=source_dag,
key=key)
def get_action_type(self):
"""Retrieve the action type"""
source_task = 'action_xcom'
source_dag = None
key = 'action_type'
return self._get_xcom(source_task=source_task,
dag_id=source_dag,
key=key)
def get_check_drydock_continue_on_fail(self):
"""Check if 'drydock_continue_on_fail' key exists"""
source_task = 'ucp_preflight_check'

View File

@ -41,6 +41,10 @@ GET_RENDEREDCONFIGDOCS = 'workflow_orchestrator:get_renderedconfigdocs'
LIST_WORKFLOWS = 'workflow_orchestrator:list_workflows'
GET_WORKFLOW = 'workflow_orchestrator:get_workflow'
GET_SITE_STATUSES = 'workflow_orchestrator:get_site_statuses'
ACTION_DEPLOY_SITE = 'workflow_orchestrator:action_deploy_site'
ACTION_UPDATE_SITE = 'workflow_orchestrator:action_update_site'
ACTION_UPDATE_SOFTWARE = 'workflow_orchestrator:action_update_software'
ACTION_REDEPLOY_SERVER = 'workflow_orchestrator:action_redeploy_server'
class ShipyardPolicy(object):
@ -76,6 +80,8 @@ class ShipyardPolicy(object):
'method': 'GET'
}]
),
# See below for finer grained action access. This controls access
# to being able to create any actions.
policy.DocumentedRuleDefault(
CREATE_ACTION,
RULE_ADMIN_REQUIRED,
@ -207,6 +213,45 @@ class ShipyardPolicy(object):
'method': 'GET'
}]
),
# Specific actions - can be controlled independently. See above for
# overall access to creating an action. This controls the ability to
# create specific actions (invoke specific workflows)
policy.DocumentedRuleDefault(
ACTION_DEPLOY_SITE,
RULE_ADMIN_REQUIRED,
'Create a workflow action to deploy the site',
[{
'path': '/api/v1.0/actions',
'method': 'POST'
}]
),
policy.DocumentedRuleDefault(
ACTION_UPDATE_SITE,
RULE_ADMIN_REQUIRED,
'Create a workflow action to update the site',
[{
'path': '/api/v1.0/actions',
'method': 'POST'
}]
),
policy.DocumentedRuleDefault(
ACTION_UPDATE_SOFTWARE,
RULE_ADMIN_REQUIRED,
'Create a workflow action to update the site software',
[{
'path': '/api/v1.0/actions',
'method': 'POST'
}]
),
policy.DocumentedRuleDefault(
ACTION_REDEPLOY_SERVER,
RULE_ADMIN_REQUIRED,
'Create a workflow action to redeploy target servers',
[{
'path': '/api/v1.0/actions',
'method': 'POST'
}]
),
]
# Regions Policy
@ -235,63 +280,66 @@ class ApiEnforcer(object):
def __call__(self, f):
@functools.wraps(f)
def secure_handler(slf, req, resp, *args, **kwargs):
ctx = req.context
policy_eng = ctx.policy_engine
LOG.info("Policy Engine: %s", policy_eng.__class__.__name__)
# perform auth
LOG.info("Enforcing policy %s on request %s",
self.action, ctx.request_id)
# policy engine must be configured
if policy_eng is None:
LOG.error(
"Error-Policy engine required-action: %s", self.action)
raise AppError(
title="Auth is not being handled by any policy engine",
status=falcon.HTTP_500,
retry=False
)
authorized = False
try:
if policy_eng.authorize(self.action, ctx):
# authorized
LOG.info("Request is authorized")
authorized = True
except:
# couldn't service the auth request
LOG.exception(
"Error - Expectation Failed - action: %s", self.action)
raise ApiError(
title="Expectation Failed",
status=falcon.HTTP_417,
retry=False
)
if authorized:
return f(slf, req, resp, *args, **kwargs)
else:
LOG.error("Auth check failed. Authenticated:%s",
ctx.authenticated)
# raise the appropriate response exeception
if ctx.authenticated:
LOG.error("Error: Forbidden access - action: %s",
self.action)
raise ApiError(
title="Forbidden",
status=falcon.HTTP_403,
description="Credentials do not permit access",
retry=False
)
else:
LOG.error("Error - Unauthenticated access")
raise ApiError(
title="Unauthenticated",
status=falcon.HTTP_401,
description="Credentials are not established",
retry=False
)
check_auth(ctx=req.context, rule=self.action)
return f(slf, req, resp, *args, **kwargs)
return secure_handler
def check_auth(ctx, rule):
"""Checks the authorization to the requested rule
:param ctx: the request context for the action being performed
:param rule: the name of the policy rule to validate the user in the
context against
Returns if authorized, otherwise raises an ApiError.
"""
try:
policy_eng = ctx.policy_engine
LOG.info("Policy Engine: %s", policy_eng.__class__.__name__)
# perform auth
LOG.info("Enforcing policy %s on request %s", rule, ctx.request_id)
# policy engine must be configured
if policy_eng is None:
LOG.error(
"Error-Policy engine required-action: %s", rule)
raise AppError(
title="Auth is not being handled by any policy engine",
status=falcon.HTTP_500,
retry=False
)
if policy_eng.authorize(rule, ctx):
# authorized - log and return
LOG.info("Request to %s is authorized", rule)
return
except Exception as ex:
# couldn't service the auth request
LOG.exception("Error - Expectation Failed - action: %s", rule)
raise ApiError(
title="Expectation Failed",
status=falcon.HTTP_417,
retry=False
)
# raise the appropriate response exeception
if ctx.authenticated:
# authenticated but not authorized
LOG.error("Error: Forbidden access - action: %s", rule)
raise ApiError(
title="Forbidden",
status=falcon.HTTP_403,
description="Credentials do not permit access",
retry=False
)
else:
LOG.error("Error - Unauthenticated access")
raise ApiError(
title="Unauthenticated",
status=falcon.HTTP_401,
description="Credentials are not established",
retry=False
)
def list_policies():
default_policy = []
default_policy.extend(ShipyardPolicy.base_rules)

View File

@ -24,8 +24,11 @@ from shipyard_airflow.common.deployment_group.errors import (
InvalidDeploymentGroupNodeLookupError
)
from shipyard_airflow.control.action.action_validators import (
validate_site_action_basic,
validate_site_action_full
validate_committed_revision,
validate_deployment_action_basic,
validate_deployment_action_full,
validate_intermediate_commits,
validate_target_nodes
)
from shipyard_airflow.errors import ApiError
from tests.unit.common.deployment_group.node_lookup_stubs import node_lookup
@ -76,10 +79,10 @@ class TestActionValidator:
@mock.patch("shipyard_airflow.control.validators."
"validate_deployment_strategy._get_node_lookup",
return_value=node_lookup)
def test_validate_site_action_full(self, *args):
def test_validate_deployment_action_full(self, *args):
"""Test the function that runs the validator class"""
try:
validate_site_action_full({
validate_deployment_action_full({
'id': '123',
'name': 'deploy_site',
'committed_rev_id': 1
@ -93,12 +96,12 @@ class TestActionValidator:
@mock.patch("shipyard_airflow.control.validators."
"validate_deployment_strategy._get_node_lookup",
return_value=node_lookup)
def test_validate_site_action_full_cycle(self, *args):
def test_validate_deployment_action_full_cycle(self, *args):
"""Test the function that runs the validator class with a
deployment strategy that has a cycle in the groups
"""
with pytest.raises(ApiError) as apie:
validate_site_action_full({
validate_deployment_action_full({
'id': '123',
'name': 'deploy_site',
'committed_rev_id': 1
@ -113,12 +116,12 @@ class TestActionValidator:
@mock.patch("shipyard_airflow.control.validators."
"validate_deployment_strategy._get_node_lookup",
return_value=node_lookup)
def test_validate_site_action_full_missing_dep_strat(self, *args):
def test_validate_deployment_action_full_missing_dep_strat(self, *args):
"""Test the function that runs the validator class with a missing
deployment strategy - specified, but not present
"""
with pytest.raises(ApiError) as apie:
validate_site_action_full({
validate_deployment_action_full({
'id': '123',
'name': 'deploy_site',
'committed_rev_id': 1
@ -131,12 +134,12 @@ class TestActionValidator:
@mock.patch("shipyard_airflow.control.validators."
"validate_deployment_strategy._get_node_lookup",
return_value=node_lookup)
def test_validate_site_action_full_default_dep_strat(self, *args):
def test_validate_deployment_action_full_default_dep_strat(self, *args):
"""Test the function that runs the validator class with a defaulted
deployment strategy (not specified)
"""
try:
validate_site_action_full({
validate_deployment_action_full({
'id': '123',
'name': 'deploy_site',
'committed_rev_id': 1
@ -145,33 +148,17 @@ class TestActionValidator:
# any exception is a failure
assert False
@mock.patch("shipyard_airflow.control.service_clients.deckhand_client",
return_value=fake_dh_doc_client('clean'), ds_name='defaulted')
@mock.patch("shipyard_airflow.control.validators."
"validate_deployment_strategy._get_node_lookup",
return_value=node_lookup)
def test_validate_site_missing_rev(self, *args):
"""Test the function that runs the validator class with a
deployment strategy that has a cycle in the groups
"""
with pytest.raises(ApiError) as apie:
validate_site_action_full({
'id': '123',
'name': 'deploy_site'
})
assert apie.value.description == 'InvalidDocumentRevision'
@mock.patch("shipyard_airflow.control.service_clients.deckhand_client",
return_value=fake_dh_doc_client('clean', ds_name='not-there'))
@mock.patch("shipyard_airflow.control.validators."
"validate_deployment_strategy._get_node_lookup",
return_value=node_lookup)
def test_validate_site_action_full_continue_failure(self, *args):
def test_validate_deployment_action_full_continue_failure(self, *args):
"""Test the function that runs the validator class with a missing
deployment strategy (not specified), but continue-on-fail specified
"""
try:
validate_site_action_full({
validate_deployment_action_full({
'id': '123',
'name': 'deploy_site',
'committed_rev_id': 1,
@ -186,13 +173,13 @@ class TestActionValidator:
@mock.patch("shipyard_airflow.control.validators."
"validate_deployment_strategy._get_node_lookup",
return_value=node_lookup)
def test_validate_site_action_basic_missing_dep_strat(self, *args):
def test_validate_deployment_action_basic_missing_dep_strat(self, *args):
"""Test the function that runs the validator class with a missing
deployment strategy - specified, but not present. This should be
ignored by the basic valdiator
"""
try:
validate_site_action_basic({
validate_deployment_action_basic({
'id': '123',
'name': 'deploy_site',
'committed_rev_id': 1
@ -206,7 +193,7 @@ class TestActionValidator:
@mock.patch("shipyard_airflow.control.validators."
"validate_deployment_strategy._get_node_lookup",
return_value=node_lookup)
def test_validate_site_action_dep_strategy_exceptions(self, *args):
def test_validate_deployment_action_dep_strategy_exceptions(self, *args):
"""Test the function that runs the validator class for exceptions"""
to_catch = [InvalidDeploymentGroupNodeLookupError,
InvalidDeploymentGroupError, DeploymentGroupCycleError]
@ -217,7 +204,7 @@ class TestActionValidator:
side_effect=exc()
):
with pytest.raises(ApiError) as apie:
validate_site_action_full({
validate_deployment_action_full({
'id': '123',
'name': 'deploy_site',
'committed_rev_id': 1
@ -233,10 +220,10 @@ class TestActionValidator:
@mock.patch("shipyard_airflow.control.validators."
"validate_deployment_strategy._get_deployment_group_manager",
side_effect=TypeError())
def test_validate_site_action_dep_strategy_exception_other(self, *args):
def test_validate_deployment_action_dep_strategy_exc_oth(self, *args):
"""Test the function that runs the validator class"""
with pytest.raises(ApiError) as apie:
validate_site_action_full({
validate_deployment_action_full({
'id': '123',
'name': 'deploy_site',
'committed_rev_id': 1
@ -244,3 +231,70 @@ class TestActionValidator:
assert apie.value.description == 'InvalidConfigurationDocuments'
assert apie.value.error_list[0]['name'] == (
'DocumentValidationProcessingError')
def _action(self, params_field, comm_rev=1, allow=False):
action = {
'id': '123',
'name': 'redeploy_server',
'allow_intermediate_commits': allow
}
if comm_rev:
action['committed_rev_id'] = comm_rev
if params_field:
action['parameters'] = params_field
return action
def test_validate_target_nodes(self, *args):
"""Test the validate_target_nodes/ValidateTargetNodes validator"""
# pass - basic case
validate_target_nodes(self._action({'target_nodes': ['node1']}))
# missing parameter
with pytest.raises(ApiError) as apie:
validate_target_nodes(self._action(None))
assert apie.value.title == 'Invalid target_nodes parameter'
# no nodes
with pytest.raises(ApiError) as apie:
validate_target_nodes(self._action({'target_nodes': []}))
assert apie.value.title == 'Invalid target_nodes parameter'
# other parameter than target_nodes
with pytest.raises(ApiError) as apie:
validate_target_nodes(self._action({'no_nodes': ['what']}))
assert apie.value.title == 'Invalid target_nodes parameter'
# not a list-able target_nodes
with pytest.raises(ApiError) as apie:
validate_target_nodes(self._action({'target_nodes': pytest}))
assert apie.value.title == 'Invalid target_nodes parameter'
# not a list-able target_nodes
with pytest.raises(ApiError) as apie:
validate_target_nodes(
self._action({'target_nodes': [{'not': 'string'}]})
)
assert apie.value.title == 'Invalid target_nodes parameter'
def test_validate_committed_revision(self, *args):
"""Test the committed revision validator"""
validate_committed_revision(self._action(None))
with pytest.raises(ApiError) as apie:
validate_committed_revision(self._action(None, comm_rev=None))
assert apie.value.title == 'No committed configdocs'
def test_validate_intermediate_commits(self, *args):
"""Test the intermediate commit validator"""
ch_fail = CfgdHelperIntermediateCommit()
ch_success = CfgdHelperIntermediateCommit(commits=False)
validate_intermediate_commits(self._action(None), ch_success)
with pytest.raises(ApiError) as apie:
validate_intermediate_commits(self._action(None), ch_fail)
assert apie.value.title == 'Intermediate commit detected'
# bypass flag - no api error
validate_intermediate_commits(
self._action(None, allow=True), ch_fail
)
class CfgdHelperIntermediateCommit():
def __init__(self, commits=True):
self.commits = commits
def check_intermediate_commit(self):
return self.commits

View File

@ -43,10 +43,6 @@ CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def CHECK_INTERMEDIATE_COMMIT(allow_intermediate_commits):
return False
def create_req(ctx, body):
'''creates a falcon request'''
env = testing.create_environ(
@ -300,7 +296,8 @@ def test_get_all_actions():
assert action['dag_status'] == 'SUCCESS'
def test_create_action():
def _gen_action_resource_stubbed():
# TODO(bryan-strassner): mabye subclass this instead?
action_resource = ActionsResource()
action_resource.get_all_actions_db = actions_db
action_resource.get_all_dag_runs_db = dag_runs_db
@ -309,97 +306,243 @@ def test_create_action():
action_resource.insert_action = insert_action_stub
action_resource.audit_control_command_db = audit_control_command_db
action_resource.get_committed_design_version = lambda: DESIGN_VERSION
action_resource.check_intermediate_commit_revision = (
CHECK_INTERMEDIATE_COMMIT)
return action_resource
@mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_deployment_action_basic')
@mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_deployment_action_full')
@mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_intermediate_commits')
def test_create_action_invalid_input(ic_val, full_val, basic_val):
action_resource = _gen_action_resource_stubbed()
# with invalid input. fail.
with mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_site_action_full') as validator:
try:
action = action_resource.create_action(
action={'name': 'broken',
'parameters': {
'a': 'aaa'
}},
context=context,
allow_intermediate_commits=False)
assert False, 'Should throw an ApiError'
except ApiError:
# expected
pass
assert not validator.called
with pytest.raises(ApiError):
action = action_resource.create_action(
action={'name': 'broken',
'parameters': {
'a': 'aaa'
}},
context=context,
allow_intermediate_commits=False)
assert not ic_val.called
assert not full_val.called
assert not basic_val.called
@mock.patch('shipyard_airflow.policy.check_auth')
@mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_deployment_action_basic')
@mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_deployment_action_full')
@mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_intermediate_commits')
def test_create_action_valid_input_and_params(ic_val, full_val, *args):
action_resource = _gen_action_resource_stubbed()
# with valid input and some parameters
with mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_site_action_full') as validator:
try:
action = action_resource.create_action(
action={'name': 'deploy_site',
'parameters': {
'a': 'aaa'
}},
context=context,
allow_intermediate_commits=False)
assert action['timestamp']
assert action['id']
assert len(action['id']) == 26
assert action['dag_execution_date'] == '2017-09-06 14:10:08.528402'
assert action['dag_status'] == 'SCHEDULED'
assert action['committed_rev_id'] == 1
except ApiError:
assert False, 'Should not raise an ApiError'
validator.assert_called_once_with(action)
try:
action = action_resource.create_action(
action={'name': 'deploy_site',
'parameters': {
'a': 'aaa'
}},
context=context,
allow_intermediate_commits=False)
assert action['timestamp']
assert action['id']
assert len(action['id']) == 26
assert action['dag_execution_date'] == '2017-09-06 14:10:08.528402'
assert action['dag_status'] == 'SCHEDULED'
assert action['committed_rev_id'] == 1
except ApiError:
assert False, 'Should not raise an ApiError'
full_val.assert_called_once_with(
action=action, configdocs_helper=action_resource.configdocs_helper)
ic_val.assert_called_once_with(
action=action, configdocs_helper=action_resource.configdocs_helper)
@mock.patch('shipyard_airflow.policy.check_auth')
@mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_deployment_action_basic')
@mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_deployment_action_full')
@mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_intermediate_commits')
def test_create_action_valid_input_no_params(ic_val, full_val, *args):
action_resource = _gen_action_resource_stubbed()
# with valid input and no parameters
with mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_site_action_full') as validator:
try:
action = action_resource.create_action(
action={'name': 'deploy_site'},
context=context,
allow_intermediate_commits=False)
assert action['timestamp']
assert action['id']
assert len(action['id']) == 26
assert action['dag_execution_date'] == '2017-09-06 14:10:08.528402'
assert action['dag_status'] == 'SCHEDULED'
assert action['committed_rev_id'] == 1
except ApiError:
assert False, 'Should not raise an ApiError'
validator.assert_called_once_with(action)
try:
action = action_resource.create_action(
action={'name': 'deploy_site'},
context=context,
allow_intermediate_commits=False)
assert action['timestamp']
assert action['id']
assert len(action['id']) == 26
assert action['dag_execution_date'] == '2017-09-06 14:10:08.528402'
assert action['dag_status'] == 'SCHEDULED'
assert action['committed_rev_id'] == 1
except ApiError:
assert False, 'Should not raise an ApiError'
full_val.assert_called_once_with(
action=action, configdocs_helper=action_resource.configdocs_helper)
ic_val.assert_called_once_with(
action=action, configdocs_helper=action_resource.configdocs_helper)
def test_create_action_validator_error():
action_resource = ActionsResource()
action_resource.get_all_actions_db = actions_db
action_resource.get_all_dag_runs_db = dag_runs_db
action_resource.get_all_tasks_db = tasks_db
action_resource.invoke_airflow_dag = airflow_stub
action_resource.insert_action = insert_action_stub
action_resource.audit_control_command_db = audit_control_command_db
action_resource.get_committed_design_version = lambda: DESIGN_VERSION
action_resource.check_intermediate_commit_revision = (
CHECK_INTERMEDIATE_COMMIT)
@mock.patch('shipyard_airflow.policy.check_auth')
@mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_deployment_action_basic')
@mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_deployment_action_full',
side_effect=ApiError(title='bad'))
@mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_intermediate_commits')
def test_create_action_validator_error(*args):
action_resource = _gen_action_resource_stubbed()
# with valid input and some parameters
with mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_site_action_full',
side_effect=ApiError(title='bad')):
with pytest.raises(ApiError) as apie:
with pytest.raises(ApiError) as apie:
action = action_resource.create_action(
action={'name': 'deploy_site',
'parameters': {
'a': 'aaa'
}},
context=context,
allow_intermediate_commits=False)
assert action['timestamp']
assert action['id']
assert len(action['id']) == 26
assert action['dag_execution_date'] == '2017-09-06 14:10:08.528402'
assert action['dag_status'] == 'SCHEDULED'
assert action['committed_rev_id'] == 1
assert apie.value.title == 'bad'
@mock.patch('shipyard_airflow.policy.check_auth')
@mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_deployment_action_basic')
def test_create_targeted_action_valid_input_and_params(basic_val, *args):
action_resource = _gen_action_resource_stubbed()
# with valid input and some parameters
try:
action = action_resource.create_action(
action={'name': 'redeploy_server',
'parameters': {
'target_nodes': ['node1']
}},
context=context,
allow_intermediate_commits=False)
assert action['timestamp']
assert action['id']
assert len(action['id']) == 26
assert action['dag_execution_date'] == '2017-09-06 14:10:08.528402'
assert action['dag_status'] == 'SCHEDULED'
assert action['committed_rev_id'] == 1
except ApiError:
assert False, 'Should not raise an ApiError'
basic_val.assert_called_once_with(
action=action, configdocs_helper=action_resource.configdocs_helper)
@mock.patch('shipyard_airflow.policy.check_auth')
@mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_deployment_action_basic')
def test_create_targeted_action_valid_input_missing_target(basic_val, *args):
action_resource = _gen_action_resource_stubbed()
# with valid input and some parameters
with pytest.raises(ApiError) as apie:
action = action_resource.create_action(
action={'name': 'redeploy_server',
'parameters': {
'target_nodes': []
}},
context=context,
allow_intermediate_commits=False)
assert action['timestamp']
assert action['id']
assert len(action['id']) == 26
assert action['dag_execution_date'] == '2017-09-06 14:10:08.528402'
assert action['dag_status'] == 'SCHEDULED'
assert action['committed_rev_id'] == 1
assert apie.value.title == 'Invalid target_nodes parameter'
assert not basic_val.called
@mock.patch('shipyard_airflow.policy.check_auth')
@mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_deployment_action_basic')
def test_create_targeted_action_valid_input_missing_param(basic_val, *args):
action_resource = _gen_action_resource_stubbed()
# with valid input and some parameters
with pytest.raises(ApiError) as apie:
action = action_resource.create_action(
action={'name': 'redeploy_server'},
context=context,
allow_intermediate_commits=False)
assert action['timestamp']
assert action['id']
assert len(action['id']) == 26
assert action['dag_execution_date'] == '2017-09-06 14:10:08.528402'
assert action['dag_status'] == 'SCHEDULED'
assert action['committed_rev_id'] == 1
assert apie.value.title == 'Invalid target_nodes parameter'
assert not basic_val.called
@mock.patch('shipyard_airflow.policy.check_auth')
@mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_deployment_action_basic')
def test_create_targeted_action_no_committed(basic_val, *args):
action_resource = _gen_action_resource_stubbed()
action_resource.get_committed_design_version = lambda: None
# with valid input and some parameters
with pytest.raises(ApiError) as apie:
action = action_resource.create_action(
action={'name': 'redeploy_server',
'parameters': {
'target_nodes': ['node1']
}},
context=context,
allow_intermediate_commits=False)
assert action['timestamp']
assert action['id']
assert len(action['id']) == 26
assert action['dag_execution_date'] == '2017-09-06 14:10:08.528402'
assert action['dag_status'] == 'SCHEDULED'
assert action['committed_rev_id'] == 1
assert apie.value.title == 'No committed configdocs'
assert not basic_val.called
# Purposefully raising Exception to test only the value passed to auth
@mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_deployment_action_basic',
side_effect=Exception('purposeful'))
@mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_deployment_action_full',
side_effect=Exception('purposeful'))
@mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_intermediate_commits',
side_effect=Exception('purposeful'))
@mock.patch('shipyard_airflow.control.action.action_validators'
'.validate_target_nodes',
side_effect=Exception('purposeful'))
@mock.patch('shipyard_airflow.policy.check_auth')
def test_auth_alignment(auth, *args):
action_resource = _gen_action_resource_stubbed()
for action_name, action_cfg in actions_api._action_mappings().items():
with pytest.raises(Exception) as ex:
action = action_resource.create_action(
action={'name': 'deploy_site',
'parameters': {
'a': 'aaa'
}},
action={'name': action_name},
context=context,
allow_intermediate_commits=False)
assert action['timestamp']
assert action['id']
assert len(action['id']) == 26
assert action['dag_execution_date'] == '2017-09-06 14:10:08.528402'
assert action['dag_status'] == 'SCHEDULED'
assert action['committed_rev_id'] == 1
assert apie.value.title == 'bad'
assert 'purposeful' in str(ex)
assert auth.called_with(action_cfg['rbac_policy'])
assert (action_cfg['rbac_policy'] ==
'workflow_orchestrator:action_{}'.format(action_name))
@patch('shipyard_airflow.db.shipyard_db.ShipyardDbAccess.'
@ -536,12 +679,8 @@ def test_get_committed_design_version(*args):
@mock.patch.object(ConfigdocsHelper, 'get_revision_id', return_value=None)
def test_get_committed_design_version_missing(*args):
with pytest.raises(ApiError) as apie:
act_resource = ActionsResource()
act_resource.configdocs_helper = ConfigdocsHelper(
ShipyardRequestContext()
)
act_resource.get_committed_design_version()
assert apie.value.status == falcon.HTTP_404
assert apie.value.title == ('Unable to locate any committed revision in '
'Deckhand')
act_resource = ActionsResource()
act_resource.configdocs_helper = ConfigdocsHelper(
ShipyardRequestContext()
)
assert act_resource.get_committed_design_version() is None

View File

@ -0,0 +1,224 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for drydock_destroy_nodes operator functions"""
import os
from unittest import mock
from airflow.exceptions import AirflowException
import pytest
from shipyard_airflow.plugins.drydock_destroy_nodes import \
DrydockDestroyNodeOperator
from shipyard_airflow.plugins.drydock_errors import (
DrydockTaskFailedException,
DrydockTaskTimeoutException
)
CONF_FILE = os.path.join(os.path.dirname(__file__), 'test.conf')
ALL_SUCCESES = ['node1', 'node2', 'node3']
# The top level result should have all successes specified
TASK_DICT = {
'0': {
'result': {
'successes': ['node1', 'node2', 'node3'],
'status': 'success',
},
'subtask_id_list': ['1'],
'status': 'complete'
},
'1': {
'result': {
'successes': ['node3'],
'status': 'success',
},
'subtask_id_list': ['2', '3'],
'status': 'complete'
},
}
def _fake_get_task_dict(task_id):
return TASK_DICT[task_id]
class TestDrydockDestroyNodesOperator:
def test_setup_configured_values(self):
op = DrydockDestroyNodeOperator(main_dag_name="main",
shipyard_conf=CONF_FILE,
task_id="t1")
op.dc = {
'physical_provisioner.destroy_interval': 1,
'physical_provisioner.destroy_timeout': 10,
}
op.setup_configured_values()
assert op.dest_interval == 1
assert op.dest_timeout == 10
def test_success_functions(self, caplog):
op = DrydockDestroyNodeOperator(main_dag_name="main",
shipyard_conf=CONF_FILE,
task_id="t1")
# testing with lists and sets.
op.target_nodes = ['n0', 'n1', 'n2']
op.successes = ['n1']
caplog.clear()
op.report_summary()
assert " Nodes requested: n0, n1, n2" in caplog.text
assert " Nodes destroyed: n1" in caplog.text
assert " Nodes not destroyed: n0, n2" in caplog.text
assert "===== End Destroy" in caplog.text
assert not op.is_destroy_successful()
op.target_nodes = set(['n0', 'n1', 'n2'])
op.successes = []
caplog.clear()
op.report_summary()
assert " Nodes requested: n0, n1, n2" in caplog.text
assert " Nodes destroyed: " in caplog.text
assert " Nodes not destroyed: n0, n1, n2" in caplog.text
assert "===== End Destroy" in caplog.text
assert not op.is_destroy_successful()
op.target_nodes = set(['n0', 'n1', 'n2'])
op.successes = set(['n0', 'n1', 'n2'])
caplog.clear()
op.report_summary()
assert " Nodes requested: n0, n1, n2" in caplog.text
assert " Nodes destroyed: n0, n1, n2" in caplog.text
assert " Nodes not destroyed: " in caplog.text
assert "===== End Destroy" in caplog.text
assert op.is_destroy_successful()
@mock.patch.object(
DrydockDestroyNodeOperator, 'create_task'
)
@mock.patch.object(
DrydockDestroyNodeOperator, 'query_task'
)
def test_execute_destroy_simple_success(self, qt, ct, caplog):
op = DrydockDestroyNodeOperator(main_dag_name="main",
shipyard_conf=CONF_FILE,
task_id="t1")
op.dc = {
'physical_provisioner.destroy_interval': 1,
'physical_provisioner.destroy_timeout': 10,
}
op.setup_configured_values()
op.execute_destroy()
assert qt.called
assert ct.called
assert not caplog.records
@mock.patch.object(
DrydockDestroyNodeOperator, 'create_task'
)
@mock.patch.object(
DrydockDestroyNodeOperator, 'query_task',
side_effect=DrydockTaskFailedException("test")
)
def test_execute_destroy_query_fail(self, qt, ct, caplog):
op = DrydockDestroyNodeOperator(main_dag_name="main",
shipyard_conf=CONF_FILE,
task_id="t1")
op.dc = {
'physical_provisioner.destroy_interval': 1,
'physical_provisioner.destroy_timeout': 10,
}
op.setup_configured_values()
op.execute_destroy()
assert qt.called
assert ct.called
assert "Task destroy_nodes has failed." in caplog.text
@mock.patch.object(
DrydockDestroyNodeOperator, 'create_task'
)
@mock.patch.object(
DrydockDestroyNodeOperator, 'query_task',
side_effect=DrydockTaskTimeoutException("test")
)
def test_execute_destroy_query_timeout(self, qt, ct, caplog):
op = DrydockDestroyNodeOperator(main_dag_name="main",
shipyard_conf=CONF_FILE,
task_id="t1")
op.dc = {
'physical_provisioner.destroy_interval': 1,
'physical_provisioner.destroy_timeout': 10,
}
op.setup_configured_values()
op.execute_destroy()
assert qt.called
assert ct.called
assert "Task destroy_nodes has timed out after 10 seconds." in (
caplog.text)
@mock.patch.object(
DrydockDestroyNodeOperator, 'get_successes_for_task',
return_value=['n0', 'n1']
)
@mock.patch.object(
DrydockDestroyNodeOperator, 'create_task'
)
@mock.patch.object(
DrydockDestroyNodeOperator, 'query_task',
side_effect=DrydockTaskTimeoutException("test")
)
def test_do_execute_fail(self, qt, ct, gs, caplog):
op = DrydockDestroyNodeOperator(main_dag_name="main",
shipyard_conf=CONF_FILE,
task_id="t1")
op.dc = {
'physical_provisioner.destroy_interval': 1,
'physical_provisioner.destroy_timeout': 10,
}
op.target_nodes = ['n0', 'n1', 'n2']
with pytest.raises(AirflowException) as ae:
op.do_execute()
assert qt.called
assert ct.called
assert gs.called
assert "Task destroy_nodes has timed out after 10 seconds." in (
caplog.text)
assert ("One or more nodes requested for destruction failed to "
"destroy") in str(ae.value)
@mock.patch.object(
DrydockDestroyNodeOperator, 'get_successes_for_task',
return_value=['n0', 'n1', 'n2']
)
@mock.patch.object(
DrydockDestroyNodeOperator, 'create_task'
)
@mock.patch.object(
DrydockDestroyNodeOperator, 'query_task',
)
def test_do_execute(self, qt, ct, gs, caplog):
op = DrydockDestroyNodeOperator(main_dag_name="main",
shipyard_conf=CONF_FILE,
task_id="t1")
op.dc = {
'physical_provisioner.destroy_interval': 1,
'physical_provisioner.destroy_timeout': 10,
}
op.target_nodes = ['n0', 'n1', 'n2']
op.do_execute()
assert qt.called
assert ct.called
assert gs.called
assert " Nodes requested: n0, n1, n2" in caplog.text
assert " Nodes destroyed: n0, n1, n2" in caplog.text
assert " Nodes not destroyed: " in caplog.text
assert "===== End Destroy" in caplog.text

View File

@ -30,10 +30,13 @@ from shipyard_airflow.common.deployment_group.deployment_group_manager import (
DeploymentGroupManager
)
from shipyard_airflow.plugins.drydock_base_operator import (
gen_node_name_filter,
)
from shipyard_airflow.plugins.drydock_nodes import (
_default_deployment_strategy,
_gen_node_name_filter,
DrydockNodesOperator,
gen_simple_deployment_strategy,
_process_deployment_groups,
QueryTaskResult
)
@ -176,7 +179,7 @@ DEP_STRAT = {'groups': yaml.safe_load(tdgm.GROUPS_YAML)}
def _fake_setup_ds(self):
self.strategy = DEP_STRAT
return DEP_STRAT
def _fake_get_task_dict(task_id):
@ -217,7 +220,7 @@ class TestDrydockNodesOperator:
critical, has no selector values, and an all-or-nothing success
criteria
"""
s = _default_deployment_strategy()
s = gen_simple_deployment_strategy()
assert s['groups'][0]['name'] == 'default'
assert s['groups'][0]['critical']
assert s['groups'][0]['selectors'][0]['node_names'] == []
@ -228,10 +231,24 @@ class TestDrydockNodesOperator:
'percent_successful_nodes': 100
}
def test_targeted_deployment_strategy(self):
"""Test a deployment strategy used for a targeted deployment"""
s = gen_simple_deployment_strategy(name="targeted", nodes=['a', 'b'])
assert s['groups'][0]['name'] == 'targeted'
assert s['groups'][0]['critical']
assert s['groups'][0]['selectors'][0]['node_names'] == ['a', 'b']
assert s['groups'][0]['selectors'][0]['node_labels'] == []
assert s['groups'][0]['selectors'][0]['node_tags'] == []
assert s['groups'][0]['selectors'][0]['rack_names'] == []
assert s['groups'][0]['success_criteria'] == {
'percent_successful_nodes': 100
}
assert len(s['groups']) == 1
def test_gen_node_name_filter(self):
"""Test that a node name filter with only node_names is created"""
nodes = ['node1', 'node2']
f = _gen_node_name_filter(nodes)
f = gen_node_name_filter(nodes)
assert f['filter_set'][0]['node_names'] == nodes
assert len(f['filter_set']) == 1
@ -242,7 +259,7 @@ class TestDrydockNodesOperator:
assert op is not None
@mock.patch.object(DrydockNodesOperator, "get_unique_doc")
def test_setup_deployment_strategy(self, udoc):
def get_deployment_strategy(self, udoc):
"""Assert that the base class method get_unique_doc would be invoked
"""
op = DrydockNodesOperator(main_dag_name="main",
@ -252,7 +269,7 @@ class TestDrydockNodesOperator:
DeploymentConfigurationOperator.config_keys_defaults
)
op.dc['physical_provisioner.deployment_strategy'] = 'taco-salad'
op._setup_deployment_strategy()
op.setup_deployment_strategy()
udoc.assert_called_once_with(
name='taco-salad',
schema="shipyard/DeploymentStrategy/v1"
@ -353,21 +370,21 @@ class TestDrydockNodesOperator:
assert 'node4 failed to join Kubernetes' in caplog.text
assert len(task_res.successes) == 2
def test_get_successess_for_task(self):
def test_get_successes_for_task(self):
op = DrydockNodesOperator(main_dag_name="main",
shipyard_conf=CONF_FILE,
task_id="t1")
op.get_task_dict = _fake_get_task_dict
s = op._get_successes_for_task('0')
s = op.get_successes_for_task('0')
for i in range(1, 3):
assert "node{}".format(i) in s
def test_get_successess_for_task_more_logging(self):
def test_get_successes_for_task_more_logging(self):
op = DrydockNodesOperator(main_dag_name="main",
shipyard_conf=CONF_FILE,
task_id="t1")
op.get_task_dict = _fake_get_task_dict
s = op._get_successes_for_task('99')
s = op.get_successes_for_task('99')
for i in range(97, 98):
assert "node{}".format(i) in s
assert "node2" not in s
@ -430,7 +447,7 @@ class TestDrydockNodesOperator:
'_execute_deployment',
new=_gen_pe_func('all-success')
)
@mock.patch.object(DrydockNodesOperator, '_setup_deployment_strategy',
@mock.patch.object(DrydockNodesOperator, 'get_deployment_strategy',
new=_fake_setup_ds)
def test_do_execute_with_dgm(self, nl, caplog):
op = DrydockNodesOperator(main_dag_name="main",

View File

@ -40,7 +40,7 @@ DESC_ACTION = """
id of the action invoked so that it can be queried subsequently. \n
FORMAT: shipyard create action <action command> --param=<parameter>
(repeatable) [--allow-intermediate-commits] \n
EXAMPLE: shipyard create action redeploy_server --param="server-name=mcp"
EXAMPLE: shipyard create action redeploy_server --param="target_nodes=mcp"
shipyard create action update_site --param="continue-on-fail=true"
"""

View File

@ -30,13 +30,13 @@ def test_create_action():
"""test create_action works with action id and param input"""
action_name = 'redeploy_server'
param = '--param="server-name=mcp"'
param = '--param="target_nodes=mcp"'
runner = CliRunner()
with patch.object(CreateAction, '__init__') as mock_method:
runner.invoke(shipyard,
[auth_vars, 'create', 'action', action_name, param])
mock_method.assert_called_once_with(ANY, action_name,
{'"server-name': 'mcp"'}, False)
{'"target_nodes': 'mcp"'}, False)
def test_create_action_negative():

View File

@ -32,7 +32,7 @@ run_action () {
# Define Variables
action=$1
server=$2
servers=$2
# Define Color
NC='\033[0m'
@ -49,11 +49,11 @@ run_action () {
# Note that deploy and update site do not require additional parameter
# to be passed in while redeploy_server requires user to indicate which
# server to redeploy
# servers to redeploy
if ! [[ ${server} ]] && [[ ${action} ]]; then
${base_docker_command} ${SHIPYARD_IMAGE} create action ${action}
elif [[ ${action} == 'redeploy_server' && ${server} ]]; then
${base_docker_command} ${SHIPYARD_IMAGE} create action redeploy_server --param="server-name=${server}"
elif [[ ${action} == 'redeploy_server' && ${servers} ]]; then
${base_docker_command} ${SHIPYARD_IMAGE} create action redeploy_server --param="target_nodes=${servers}"
else
echo "Invalid Input!"
exit 1

View File

@ -23,15 +23,15 @@ set -ex
# $ ./redeploy_server.sh controller01
#
if [[ -z "$1" ]]; then
echo -e "Please specify the server name!"
echo -e "Please specify the server names as a comma separated string."
exit 1
fi
# Define Variables
server=$1
servers=$1
# Source environment variables
source set_env
# Execute shipyard action for redeploy_server
bash execute_shipyard_action.sh 'redeploy_server' ${server}
bash execute_shipyard_action.sh 'redeploy_server' ${servers}