diff --git a/etc/shipyard/shipyard.conf.sample b/etc/shipyard/shipyard.conf.sample index 1c18d2cc..67fbb874 100644 --- a/etc/shipyard/shipyard.conf.sample +++ b/etc/shipyard/shipyard.conf.sample @@ -20,7 +20,13 @@ # # The web server for Airflow (string value) -#web_server = http://localhost:32080 +#web_server = http://localhost:32080/ + +# Seconds to wait to connect to the airflow api (integer value) +#airflow_api_connect_timeout = 5 + +# Seconds to wait for a response from the airflow api (integer value) +#airflow_api_read_timeout = 60 # The database for shipyard (string value) #postgresql_db = postgresql+psycopg2://shipyard:changeme@postgresql.ucp:5432/shipyard @@ -31,9 +37,6 @@ # The direcotry containing the alembic.ini file (string value) #alembic_ini_path = /home/shipyard/shipyard -# Upgrade the database on startup (boolean value) -#upgrade_db = true - [deckhand] @@ -58,39 +61,6 @@ # (string value) #service_type = physicalprovisioner -# Query interval (in seconds) for verify_site task (integer value) -#verify_site_query_interval = 10 - -# Time out (in seconds) for verify_site task (integer value) -#verify_site_task_timeout = 60 - -# Query interval (in seconds) for prepare_site task (integer value) -#prepare_site_query_interval = 10 - -# Time out (in seconds) for prepare_site task (integer value) -#prepare_site_task_timeout = 300 - -# Query interval (in seconds) for prepare_node task (integer value) -#prepare_node_query_interval = 30 - -# Time out (in seconds) for prepare_node task (integer value) -#prepare_node_task_timeout = 1800 - -# Query interval (in seconds) for deploy_node task (integer value) -#deploy_node_query_interval = 30 - -# Time out (in seconds) for deploy_node task (integer value) -#deploy_node_task_timeout = 3600 - -# Query interval (in seconds) for destroy_node task (integer value) -#destroy_node_query_interval = 30 - -# Time out (in seconds) for destroy_node task (integer value) -#destroy_node_task_timeout = 900 - -# Backoff time (in seconds) before checking cluster join (integer value) -#cluster_join_check_backoff_time = 120 - [keystone_authtoken] @@ -278,17 +248,22 @@ [requests_config] -# Deckhand client connect timeout (in seconds) + +# +# From shipyard_airflow +# + +# Deckhand client connect timeout (in seconds) (integer value) #deckhand_client_connect_timeout = 5 -# Deckhand client timeout (in seconds) for GET, -# PUT, POST and DELETE request +# Deckhand client timeout (in seconds) for GET, PUT, POST and DELETE request +# (integer value) #deckhand_client_read_timeout = 300 -# UCP component validation connect timeout (in seconds) +# UCP component validation connect timeout (in seconds) (integer value) #validation_connect_timeout = 5 -# UCP component validation timeout (in seconds) +# UCP component validation timeout (in seconds) (integer value) #validation_read_timeout = 300 diff --git a/shipyard_airflow/conf/config.py b/shipyard_airflow/conf/config.py index 76f8b43a..9a5fed93 100644 --- a/shipyard_airflow/conf/config.py +++ b/shipyard_airflow/conf/config.py @@ -32,6 +32,16 @@ SECTIONS = [ default='http://localhost:32080/', help='The web server for Airflow' ), + cfg.IntOpt( + 'airflow_api_connect_timeout', + default=5, + help='Seconds to wait to connect to the airflow api' + ), + cfg.IntOpt( + 'airflow_api_read_timeout', + default=60, + help='Seconds to wait for a response from the airflow api' + ), cfg.StrOpt( 'postgresql_db', default=( @@ -52,7 +62,7 @@ SECTIONS = [ 'alembic_ini_path', default='/home/shipyard/shipyard', help='The direcotry containing the alembic.ini file' - ) + ), ] ), ConfigSection( @@ -125,61 +135,6 @@ SECTIONS = [ 'the service lookup in the Keystone service catalog.' ) ), - cfg.IntOpt( - 'verify_site_query_interval', - default=10, - help='Query interval (in seconds) for verify_site task' - ), - cfg.IntOpt( - 'verify_site_task_timeout', - default=60, - help='Time out (in seconds) for verify_site task' - ), - cfg.IntOpt( - 'prepare_site_query_interval', - default=10, - help='Query interval (in seconds) for prepare_site task' - ), - cfg.IntOpt( - 'prepare_site_task_timeout', - default=300, - help='Time out (in seconds) for prepare_site task' - ), - cfg.IntOpt( - 'prepare_node_query_interval', - default=30, - help='Query interval (in seconds) for prepare_node task' - ), - cfg.IntOpt( - 'prepare_node_task_timeout', - default=1800, - help='Time out (in seconds) for prepare_node task' - ), - cfg.IntOpt( - 'deploy_node_query_interval', - default=30, - help='Query interval (in seconds) for deploy_node task' - ), - cfg.IntOpt( - 'deploy_node_task_timeout', - default=3600, - help='Time out (in seconds) for deploy_node task' - ), - cfg.IntOpt( - 'destroy_node_query_interval', - default=30, - help='Query interval (in seconds) for destroy_node task' - ), - cfg.IntOpt( - 'destroy_node_task_timeout', - default=900, - help='Time out (in seconds) for destroy_node task' - ), - cfg.IntOpt( - 'cluster_join_check_backoff_time', - default=120, - help='Backoff time (in seconds) before checking cluster join' - ), ] ), ConfigSection( diff --git a/shipyard_airflow/control/action/actions_api.py b/shipyard_airflow/control/action/actions_api.py index 760195d5..9f44971a 100644 --- a/shipyard_airflow/control/action/actions_api.py +++ b/shipyard_airflow/control/action/actions_api.py @@ -221,8 +221,13 @@ class ActionsResource(BaseResource): :param dag_id: the name of the dag to invoke :param action: the action structure to invoke the dag with """ + # TODO(bryan-strassner) refactor the mechanics of this method to an + # airflow api client module + # Retrieve URL web_server_url = CONF.base.web_server + c_timeout = CONF.base.airflow_api_connect_timeout + r_timeout = CONF.base.airflow_api_read_timeout if 'Error' in web_server_url: raise ApiError( @@ -232,7 +237,6 @@ class ActionsResource(BaseResource): 'value'), status=falcon.HTTP_503, retry=True, ) - else: conf_value = {'action': action} # "conf" - JSON string that gets pickled into the DagRun's @@ -242,7 +246,7 @@ class ActionsResource(BaseResource): dag_id, self.to_json(conf_value))) try: - resp = requests.get(req_url, timeout=(5, 15)) + resp = requests.get(req_url, timeout=(c_timeout, r_timeout)) LOG.info('Response code from Airflow trigger_dag: %s', resp.status_code) # any 4xx/5xx will be HTTPError, which are RequestException @@ -268,6 +272,8 @@ class ActionsResource(BaseResource): return dag_execution_date def _exhume_date(self, dag_id, log_string): + # TODO(bryan-strassner) refactor this to an airflow api client module + # we are unable to use the response time because that # does not match the time when the dag was recorded. # We have to parse the stdout returned to find the diff --git a/shipyard_airflow/dags/armada_deploy_site.py b/shipyard_airflow/dags/armada_deploy_site.py index 33aea9c2..7839f740 100644 --- a/shipyard_airflow/dags/armada_deploy_site.py +++ b/shipyard_airflow/dags/armada_deploy_site.py @@ -15,10 +15,7 @@ from airflow.models import DAG from airflow.operators import ArmadaOperator -# Location of shiyard.conf -# Note that the shipyard.conf file needs to be placed on a volume -# that can be accessed by the containers -config_path = '/usr/local/airflow/plugins/shipyard.conf' +from config_path import config_path def deploy_site_armada(parent_dag_name, child_dag_name, args): diff --git a/shipyard_airflow/dags/common_step_factory.py b/shipyard_airflow/dags/common_step_factory.py new file mode 100644 index 00000000..5fab959f --- /dev/null +++ b/shipyard_airflow/dags/common_step_factory.py @@ -0,0 +1,175 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from airflow.operators import ConcurrencyCheckOperator +from airflow.operators.python_operator import PythonOperator +from airflow.operators.subdag_operator import SubDagOperator + +from armada_deploy_site import deploy_site_armada +import dag_names as dn +from deckhand_get_design import get_design_deckhand +from destroy_node import destroy_server +from drydock_deploy_site import deploy_site_drydock +from failure_handlers import step_failure_handler +from dag_deployment_configuration import get_deployment_configuration +from preflight_checks import all_preflight_checks +from validate_site_design import validate_site_design + + +class CommonStepFactory(object): + """Common step factory + + A factory to generate steps that are reused among multiple dags + """ + def __init__(self, parent_dag_name, dag, default_args): + """Creates a factory + + Uses the specified parent_dag_name + """ + self.parent_dag_name = parent_dag_name + self.dag = dag + self.default_args = default_args + + def get_action_xcom(self, task_id=dn.ACTION_XCOM): + """Generate the action_xcom step + + Step responsible for getting the action information passed + by the invocation of the dag, which includes any options. + """ + def xcom_push(**kwargs): + """xcom_push function + + Defines a push function to store the content of 'action' that is + defined via 'dag_run' in XCOM so that it can be used by the + Operators + """ + + kwargs['ti'].xcom_push(key='action', + value=kwargs['dag_run'].conf['action']) + + return PythonOperator(task_id=task_id, + dag=self.dag, + python_callable=xcom_push) + + def get_concurrency_check(self, task_id=dn.DAG_CONCURRENCY_CHECK_DAG_NAME): + """Generate the concurrency check step + + Concurrency check prevents simultaneous execution of dags that should + not execute together. + """ + return ConcurrencyCheckOperator( + task_id=task_id, + on_failure_callback=step_failure_handler, + dag=self.dag) + + def get_preflight(self, task_id=dn.ALL_PREFLIGHT_CHECKS_DAG_NAME): + """Generate the preflight step + + Preflight checks preconditions for running a DAG + """ + return SubDagOperator( + subdag=all_preflight_checks( + self.parent_dag_name, + task_id, + args=self.default_args), + task_id=task_id, + on_failure_callback=step_failure_handler, + dag=self.dag) + + def get_get_design_version(self, task_id=dn.DECKHAND_GET_DESIGN_VERSION): + """Generate the get design version step + + Retrieves the version of the design to use from deckhand + """ + return SubDagOperator( + subdag=get_design_deckhand( + self.parent_dag_name, + task_id, + args=self.default_args), + task_id=task_id, + on_failure_callback=step_failure_handler, + dag=self.dag) + + def get_validate_site_design(self, + task_id=dn.VALIDATE_SITE_DESIGN_DAG_NAME): + """Generate the validate site design step + + Validation of the site design checks that the design to be used + for a deployment passes checks before using it. + """ + return SubDagOperator( + subdag=validate_site_design( + self.parent_dag_name, + task_id, + args=self.default_args), + task_id=task_id, + on_failure_callback=step_failure_handler, + dag=self.dag) + + def get_deployment_configuration(self, + task_id=dn.GET_DEPLOY_CONF_DAG_NAME): + """Generate the step to retrieve the deployment configuration + + This step provides the timings and strategies that will be used in + subsequent steps + """ + return SubDagOperator( + subdag=get_deployment_configuration( + self.parent_dag_name, + task_id, + args=self.default_args), + task_id=task_id, + on_failure_callback=step_failure_handler, + dag=self.dag) + + def get_drydock_build(self, task_id=dn.DRYDOCK_BUILD_DAG_NAME): + """Generate the drydock build step + + Drydock build does the hardware provisioning. + """ + return SubDagOperator( + subdag=deploy_site_drydock( + self.parent_dag_name, + task_id, + args=self.default_args), + task_id=task_id, + on_failure_callback=step_failure_handler, + dag=self.dag) + + def get_armada_build(self, task_id=dn.ARMADA_BUILD_DAG_NAME): + """Generate the armada build step + + Armada build does the deployment of helm charts + """ + return SubDagOperator( + subdag=deploy_site_armada( + self.parent_dag_name, + task_id, + args=self.default_args), + task_id=task_id, + on_failure_callback=step_failure_handler, + dag=self.dag) + + def get_destroy_server(self, task_id=dn.DESTROY_SERVER_DAG_NAME): + """Generate a destroy server step + + Destroy server tears down kubernetes and hardware + """ + return SubDagOperator( + subdag=destroy_server( + self.parent_dag_name, + task_id, + args=self.default_args), + task_id=task_id, + on_failure_callback=step_failure_handler, + dag=self.dag) diff --git a/shipyard_airflow/dags/config_path.py b/shipyard_airflow/dags/config_path.py new file mode 100644 index 00000000..514e419d --- /dev/null +++ b/shipyard_airflow/dags/config_path.py @@ -0,0 +1,18 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Location of shiyard.conf +# Note that the shipyard.conf file needs to be placed on a volume +# that can be accessed by the containers +config_path = '/usr/local/airflow/plugins/shipyard.conf' diff --git a/shipyard_airflow/dags/dag_deployment_configuration.py b/shipyard_airflow/dags/dag_deployment_configuration.py new file mode 100644 index 00000000..e595186a --- /dev/null +++ b/shipyard_airflow/dags/dag_deployment_configuration.py @@ -0,0 +1,36 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow.models import DAG +from airflow.operators import DeploymentConfigurationOperator + +from config_path import config_path + + +GET_DEPLOYMENT_CONFIGURATION_NAME = 'get_deployment_configuration' + + +def get_deployment_configuration(parent_dag_name, child_dag_name, args): + """DAG to retrieve deployment configuration""" + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args) + + deployment_configuration = DeploymentConfigurationOperator( + task_id=GET_DEPLOYMENT_CONFIGURATION_NAME, + shipyard_conf=config_path, + main_dag_name=parent_dag_name, + dag=dag) + + return dag diff --git a/shipyard_airflow/dags/dag_names.py b/shipyard_airflow/dags/dag_names.py new file mode 100644 index 00000000..b1bcad88 --- /dev/null +++ b/shipyard_airflow/dags/dag_names.py @@ -0,0 +1,26 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Subdags +ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight' +ARMADA_BUILD_DAG_NAME = 'armada_build' +DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check' +DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version' +GET_DEPLOY_CONF_DAG_NAME = 'dag_deployment_configuration' +DRYDOCK_BUILD_DAG_NAME = 'drydock_build' +VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design' +DESTROY_SERVER_DAG_NAME = 'destroy_server' + +# Steps +ACTION_XCOM = 'action_xcom' diff --git a/shipyard_airflow/dags/deckhand_get_design.py b/shipyard_airflow/dags/deckhand_get_design.py index e8d71a8e..138e23e5 100644 --- a/shipyard_airflow/dags/deckhand_get_design.py +++ b/shipyard_airflow/dags/deckhand_get_design.py @@ -16,11 +16,7 @@ from airflow.models import DAG from airflow.operators import DeckhandGetDesignOperator from airflow.operators import DeckhandRetrieveRenderedDocOperator - -# Location of shiyard.conf -# Note that the shipyard.conf file needs to be placed on a volume -# that can be accessed by the containers -config_path = '/usr/local/airflow/plugins/shipyard.conf' +from config_path import config_path def get_design_deckhand(parent_dag_name, child_dag_name, args): diff --git a/shipyard_airflow/dags/deploy_site.py b/shipyard_airflow/dags/deploy_site.py index 2dbc4e31..fa40767f 100644 --- a/shipyard_airflow/dags/deploy_site.py +++ b/shipyard_airflow/dags/deploy_site.py @@ -1,4 +1,4 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,29 +14,16 @@ from datetime import timedelta import airflow -import failure_handlers from airflow import DAG -from airflow.operators import ConcurrencyCheckOperator -from airflow.operators.python_operator import PythonOperator -from airflow.operators.subdag_operator import SubDagOperator -from armada_deploy_site import deploy_site_armada -from deckhand_get_design import get_design_deckhand -from drydock_deploy_site import deploy_site_drydock -from preflight_checks import all_preflight_checks -from validate_site_design import validate_site_design -""" -deploy_site is the top-level orchestration DAG for deploying a site using the -Undercloud platform. +from common_step_factory import CommonStepFactory +"""deploy_site + +the top-level orchestration DAG for deploying a site using the Undercloud +platform. """ -ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight' -ARMADA_BUILD_DAG_NAME = 'armada_build' -DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check' -DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version' -DRYDOCK_BUILD_DAG_NAME = 'drydock_build' PARENT_DAG_NAME = 'deploy_site' -VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design' default_args = { 'owner': 'airflow', @@ -51,66 +38,28 @@ default_args = { } dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None) -""" -Define push function to store the content of 'action' that is -defined via 'dag_run' in XCOM so that it can be used by the -Operators -""" +step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME, + dag=dag, + default_args=default_args) -def xcom_push(**kwargs): - # Pushes action XCom - kwargs['ti'].xcom_push(key='action', - value=kwargs['dag_run'].conf['action']) - - -action_xcom = PythonOperator( - task_id='action_xcom', dag=dag, python_callable=xcom_push) - -concurrency_check = ConcurrencyCheckOperator( - task_id=DAG_CONCURRENCY_CHECK_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -preflight = SubDagOperator( - subdag=all_preflight_checks( - PARENT_DAG_NAME, ALL_PREFLIGHT_CHECKS_DAG_NAME, args=default_args), - task_id=ALL_PREFLIGHT_CHECKS_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -get_design_version = SubDagOperator( - subdag=get_design_deckhand( - PARENT_DAG_NAME, DECKHAND_GET_DESIGN_VERSION, args=default_args), - task_id=DECKHAND_GET_DESIGN_VERSION, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -validate_site_design = SubDagOperator( - subdag=validate_site_design( - PARENT_DAG_NAME, VALIDATE_SITE_DESIGN_DAG_NAME, args=default_args), - task_id=VALIDATE_SITE_DESIGN_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -drydock_build = SubDagOperator( - subdag=deploy_site_drydock( - PARENT_DAG_NAME, DRYDOCK_BUILD_DAG_NAME, args=default_args), - task_id=DRYDOCK_BUILD_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -armada_build = SubDagOperator( - subdag=deploy_site_armada( - PARENT_DAG_NAME, ARMADA_BUILD_DAG_NAME, args=default_args), - task_id=ARMADA_BUILD_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) +action_xcom = step_factory.get_action_xcom() +concurrency_check = step_factory.get_concurrency_check() +preflight = step_factory.get_preflight() +get_design_version = step_factory.get_get_design_version() +validate_site_design = step_factory.get_validate_site_design() +deployment_configuration = step_factory.get_deployment_configuration() +drydock_build = step_factory.get_drydock_build() +armada_build = step_factory.get_armada_build() # DAG Wiring concurrency_check.set_upstream(action_xcom) preflight.set_upstream(concurrency_check) get_design_version.set_upstream(preflight) validate_site_design.set_upstream(get_design_version) -drydock_build.set_upstream(validate_site_design) +deployment_configuration.set_upstream(get_design_version) +drydock_build.set_upstream([ + validate_site_design, + deployment_configuration +]) armada_build.set_upstream(drydock_build) diff --git a/shipyard_airflow/dags/destroy_node.py b/shipyard_airflow/dags/destroy_node.py index 4c8ade20..ea7f5a0e 100644 --- a/shipyard_airflow/dags/destroy_node.py +++ b/shipyard_airflow/dags/destroy_node.py @@ -20,11 +20,7 @@ from airflow.operators import PromenadeDecommissionNodeOperator from airflow.operators import PromenadeDrainNodeOperator from airflow.operators import PromenadeShutdownKubeletOperator - -# Location of shiyard.conf -# Note that the shipyard.conf file needs to be placed on a volume -# that can be accessed by the containers -config_path = '/usr/local/airflow/plugins/shipyard.conf' +from config_path import config_path def destroy_server(parent_dag_name, child_dag_name, args): diff --git a/shipyard_airflow/dags/drydock_deploy_site.py b/shipyard_airflow/dags/drydock_deploy_site.py index 0dd9bae8..651c320e 100644 --- a/shipyard_airflow/dags/drydock_deploy_site.py +++ b/shipyard_airflow/dags/drydock_deploy_site.py @@ -15,11 +15,7 @@ from airflow.models import DAG from airflow.operators import DryDockOperator - -# Location of shiyard.conf -# Note that the shipyard.conf file needs to be placed on a volume -# that can be accessed by the containers -config_path = '/usr/local/airflow/plugins/shipyard.conf' +from config_path import config_path def deploy_site_drydock(parent_dag_name, child_dag_name, args): diff --git a/shipyard_airflow/dags/preflight_checks.py b/shipyard_airflow/dags/preflight_checks.py index 83bff791..121c0a18 100644 --- a/shipyard_airflow/dags/preflight_checks.py +++ b/shipyard_airflow/dags/preflight_checks.py @@ -16,11 +16,7 @@ from airflow.models import DAG from airflow.operators import K8sHealthCheckOperator from airflow.operators import UcpHealthCheckOperator - -# Location of shiyard.conf -# Note that the shipyard.conf file needs to be placed on a volume -# that can be accessed by the containers -config_path = '/usr/local/airflow/plugins/shipyard.conf' +from config_path import config_path def all_preflight_checks(parent_dag_name, child_dag_name, args): diff --git a/shipyard_airflow/dags/redeploy_server.py b/shipyard_airflow/dags/redeploy_server.py index 5d0d25aa..32da66c0 100644 --- a/shipyard_airflow/dags/redeploy_server.py +++ b/shipyard_airflow/dags/redeploy_server.py @@ -1,4 +1,4 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,29 +14,16 @@ from datetime import timedelta import airflow -import failure_handlers from airflow import DAG -from airflow.operators import ConcurrencyCheckOperator -from airflow.operators.python_operator import PythonOperator -from airflow.operators.subdag_operator import SubDagOperator -from deckhand_get_design import get_design_deckhand -from destroy_node import destroy_server -from drydock_deploy_site import deploy_site_drydock -from preflight_checks import all_preflight_checks -from validate_site_design import validate_site_design -""" -redeploy_server is the top-level orchestration DAG for redeploying a -server using the Undercloud platform. +from common_step_factory import CommonStepFactory +"""redeploy_server + +The top-level orchestration DAG for redeploying a server using the Undercloud +platform. """ -ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight' -DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check' -DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version' -DESTROY_SERVER_DAG_NAME = 'destroy_server' -DRYDOCK_BUILD_DAG_NAME = 'drydock_build' PARENT_DAG_NAME = 'redeploy_server' -VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design' default_args = { 'owner': 'airflow', @@ -51,66 +38,29 @@ default_args = { } dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None) -""" -Define push function to store the content of 'action' that is -defined via 'dag_run' in XCOM so that it can be used by the -Operators -""" + +step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME, + dag=dag, + default_args=default_args) -def xcom_push(**kwargs): - # Pushes action XCom - kwargs['ti'].xcom_push(key='action', - value=kwargs['dag_run'].conf['action']) - - -action_xcom = PythonOperator( - task_id='action_xcom', dag=dag, python_callable=xcom_push) - -concurrency_check = ConcurrencyCheckOperator( - task_id=DAG_CONCURRENCY_CHECK_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -preflight = SubDagOperator( - subdag=all_preflight_checks( - PARENT_DAG_NAME, ALL_PREFLIGHT_CHECKS_DAG_NAME, args=default_args), - task_id=ALL_PREFLIGHT_CHECKS_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -get_design_version = SubDagOperator( - subdag=get_design_deckhand( - PARENT_DAG_NAME, DECKHAND_GET_DESIGN_VERSION, args=default_args), - task_id=DECKHAND_GET_DESIGN_VERSION, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -validate_site_design = SubDagOperator( - subdag=validate_site_design( - PARENT_DAG_NAME, VALIDATE_SITE_DESIGN_DAG_NAME, args=default_args), - task_id=VALIDATE_SITE_DESIGN_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -destroy_server = SubDagOperator( - subdag=destroy_server( - PARENT_DAG_NAME, DESTROY_SERVER_DAG_NAME, args=default_args), - task_id=DESTROY_SERVER_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -drydock_build = SubDagOperator( - subdag=deploy_site_drydock( - PARENT_DAG_NAME, DRYDOCK_BUILD_DAG_NAME, args=default_args), - task_id=DRYDOCK_BUILD_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) +action_xcom = step_factory.get_action_xcom() +concurrency_check = step_factory.get_concurrency_check() +preflight = step_factory.get_preflight() +get_design_version = step_factory.get_get_design_version() +validate_site_design = step_factory.get_validate_site_design() +deployment_configuration = step_factory.get_deployment_configuration() +destroy_server = step_factory.get_destroy_server() +drydock_build = step_factory.get_drydock_build() # DAG Wiring concurrency_check.set_upstream(action_xcom) preflight.set_upstream(concurrency_check) get_design_version.set_upstream(preflight) validate_site_design.set_upstream(get_design_version) -destroy_server.set_upstream(validate_site_design) +deployment_configuration.set_upstream(get_design_version) +destroy_server.set_upstream([ + validate_site_design, + deployment_configuration +]) drydock_build.set_upstream(destroy_server) diff --git a/shipyard_airflow/dags/update_site.py b/shipyard_airflow/dags/update_site.py index 2df71004..13dad988 100644 --- a/shipyard_airflow/dags/update_site.py +++ b/shipyard_airflow/dags/update_site.py @@ -1,4 +1,4 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,32 +14,21 @@ from datetime import timedelta import airflow -import failure_handlers from airflow import DAG -from airflow.operators import ConcurrencyCheckOperator -from airflow.operators.python_operator import PythonOperator -from airflow.operators.subdag_operator import SubDagOperator -from armada_deploy_site import deploy_site_armada -from deckhand_get_design import get_design_deckhand -from drydock_deploy_site import deploy_site_drydock -from validate_site_design import validate_site_design -""" -update_site is the top-level orchestration DAG for updating a site using the -Undercloud platform. +from common_step_factory import CommonStepFactory + +"""update_site + +The top-level orchestration DAG for updating a site using the Undercloud +platform. TODO: We will disable pre-flight checks for now and will revisit it at a later date. The pre-flight checks will be more targeted in the case of 'update_site' and will include specific checks on things like coredns, calico and ceph. """ - -ARMADA_BUILD_DAG_NAME = 'armada_build' -DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check' -DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version' -DRYDOCK_BUILD_DAG_NAME = 'drydock_build' PARENT_DAG_NAME = 'update_site' -VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design' default_args = { 'owner': 'airflow', @@ -54,58 +43,26 @@ default_args = { } dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None) -""" -Define push function to store the content of 'action' that is -defined via 'dag_run' in XCOM so that it can be used by the -Operators -""" +step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME, + dag=dag, + default_args=default_args) -def xcom_push(**kwargs): - # Pushes action XCom - kwargs['ti'].xcom_push(key='action', - value=kwargs['dag_run'].conf['action']) - - -action_xcom = PythonOperator( - task_id='action_xcom', dag=dag, python_callable=xcom_push) - -concurrency_check = ConcurrencyCheckOperator( - task_id=DAG_CONCURRENCY_CHECK_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -get_design_version = SubDagOperator( - subdag=get_design_deckhand( - PARENT_DAG_NAME, DECKHAND_GET_DESIGN_VERSION, args=default_args), - task_id=DECKHAND_GET_DESIGN_VERSION, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -validate_site_design = SubDagOperator( - subdag=validate_site_design( - PARENT_DAG_NAME, VALIDATE_SITE_DESIGN_DAG_NAME, args=default_args), - task_id=VALIDATE_SITE_DESIGN_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -drydock_build = SubDagOperator( - subdag=deploy_site_drydock( - PARENT_DAG_NAME, DRYDOCK_BUILD_DAG_NAME, args=default_args), - task_id=DRYDOCK_BUILD_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -armada_build = SubDagOperator( - subdag=deploy_site_armada( - PARENT_DAG_NAME, ARMADA_BUILD_DAG_NAME, args=default_args), - task_id=ARMADA_BUILD_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) +action_xcom = step_factory.get_action_xcom() +concurrency_check = step_factory.get_concurrency_check() +get_design_version = step_factory.get_get_design_version() +validate_site_design = step_factory.get_validate_site_design() +deployment_configuration = step_factory.get_deployment_configuration() +drydock_build = step_factory.get_drydock_build() +armada_build = step_factory.get_armada_build() # DAG Wiring concurrency_check.set_upstream(action_xcom) get_design_version.set_upstream(concurrency_check) validate_site_design.set_upstream(get_design_version) -drydock_build.set_upstream(validate_site_design) +deployment_configuration.set_upstream(get_design_version) +drydock_build.set_upstream([ + validate_site_design, + deployment_configuration +]) armada_build.set_upstream(drydock_build) diff --git a/shipyard_airflow/dags/validate_site_design.py b/shipyard_airflow/dags/validate_site_design.py index 37d8d22f..9778a1c6 100644 --- a/shipyard_airflow/dags/validate_site_design.py +++ b/shipyard_airflow/dags/validate_site_design.py @@ -17,10 +17,7 @@ from airflow.operators import ArmadaOperator from airflow.operators import DeckhandValidateSiteDesignOperator from airflow.operators import DryDockOperator -# Location of shiyard.conf -# Note that the shipyard.conf file needs to be placed on a volume -# that can be accessed by the containers -config_path = '/usr/local/airflow/plugins/shipyard.conf' +from config_path import config_path def validate_site_design(parent_dag_name, child_dag_name, args): diff --git a/shipyard_airflow/plugins/armada_operator.py b/shipyard_airflow/plugins/armada_operator.py index 683e7119..fc0b94c8 100644 --- a/shipyard_airflow/plugins/armada_operator.py +++ b/shipyard_airflow/plugins/armada_operator.py @@ -28,6 +28,7 @@ import armada.common.session as session from get_k8s_pod_port_ip import get_pod_port_ip from service_endpoint import ucp_service_endpoint from service_token import shipyard_service_token +from xcom_puller import XcomPuller class ArmadaOperator(BaseOperator): @@ -37,6 +38,9 @@ class ArmadaOperator(BaseOperator): :param main_dag_name: Parent Dag :param shipyard_conf: Location of shipyard.conf :param sub_dag_name: Child Dag + + The Drydock operator assumes that prior steps have set xcoms for + the action and the deployment configuration """ @apply_defaults @@ -46,7 +50,6 @@ class ArmadaOperator(BaseOperator): shipyard_conf=None, svc_token=None, sub_dag_name=None, - workflow_info={}, xcom_push=True, *args, **kwargs): @@ -56,7 +59,6 @@ class ArmadaOperator(BaseOperator): self.shipyard_conf = shipyard_conf self.svc_token = svc_token self.sub_dag_name = sub_dag_name - self.workflow_info = workflow_info self.xcom_push_flag = xcom_push def execute(self, context): @@ -67,16 +69,12 @@ class ArmadaOperator(BaseOperator): # Define task_instance task_instance = context['task_instance'] - # Extract information related to current workflow - # The workflow_info variable will be a dictionary - # that contains information about the workflow such - # as action_id, name and other related parameters - workflow_info = task_instance.xcom_pull( - task_ids='action_xcom', key='action', - dag_id=self.main_dag_name) + # Set up and retrieve values from xcom + self.xcom_puller = XcomPuller(self.main_dag_name, task_instance) + self.action_info = self.xcom_puller.get_action_info() # Logs uuid of action performed by the Operator - logging.info("Armada Operator for action %s", workflow_info['id']) + logging.info("Armada Operator for action %s", self.action_info['id']) # Retrieve Deckhand Design Reference design_ref = self.get_deckhand_design_ref(context) @@ -108,6 +106,10 @@ class ArmadaOperator(BaseOperator): return site_design_validity + # Set up target manifest (only if not doing validate) + self.dc = self.xcom_puller.get_deployment_configuration() + self.target_manifest = self.dc['armada.manifest'] + # Create Armada Client # Retrieve Endpoint Information svc_type = 'armada' @@ -128,13 +130,8 @@ class ArmadaOperator(BaseOperator): # Armada Apply elif self.action == 'armada_apply': - # TODO (bryan-strassner) externalize the name of the manifest to - # use this needs to come from a site configuration document for - # consumption by shipyard/airflow. For now. "full-site" is the - # only value that will work. - target_manifest = 'full-site' self.armada_apply(context, armada_client, design_ref, - target_manifest) + self.target_manifest) # Armada Get Releases elif self.action == 'armada_get_releases': @@ -268,14 +265,7 @@ class ArmadaOperator(BaseOperator): logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint) # Retrieve revision_id from xcom - # Note that in the case of 'deploy_site', the dag_id will - # be 'deploy_site.deckhand_get_design_version' for the - # 'deckhand_get_design_version' task. We need to extract - # the xcom value from it in order to get the value of the - # last committed revision ID - committed_revision_id = context['task_instance'].xcom_pull( - task_ids='deckhand_get_design_version', - dag_id=self.main_dag_name + '.deckhand_get_design_version') + committed_revision_id = self.xcom_puller.get_design_version() # Form Design Reference Path that we will use to retrieve # the Design YAMLs diff --git a/shipyard_airflow/plugins/concurrency_check_operator.py b/shipyard_airflow/plugins/concurrency_check_operator.py index 50ced2e6..9e99eb59 100644 --- a/shipyard_airflow/plugins/concurrency_check_operator.py +++ b/shipyard_airflow/plugins/concurrency_check_operator.py @@ -56,9 +56,14 @@ class ConcurrencyCheckOperator(BaseOperator): @apply_defaults def __init__(self, conflicting_dag_set=None, *args, **kwargs): super(ConcurrencyCheckOperator, self).__init__(*args, **kwargs) - if conflicting_dag_set is not None: - self.conflicting_dag_set = conflicting_dag_set - else: + self.conflicting_dag_set = conflicting_dag_set + + def execute(self, context): + """ + Run the check to see if this DAG has an concurrency issues with other + DAGs. Stop the workflow if there is. + """ + if self.conflicting_dag_set is None: self.check_dag_id = self.dag.dag_id logging.debug('dag_id is %s', self.check_dag_id) if '.' in self.dag.dag_id: @@ -70,11 +75,6 @@ class ConcurrencyCheckOperator(BaseOperator): self.conflicting_dag_set = find_conflicting_dag_set( self.check_dag_id) - def execute(self, context): - """ - Run the check to see if this DAG has an concurrency issues with other - DAGs. Stop the workflow if there is. - """ logging.info('Checking for running of dags: %s', ', '.join(self.conflicting_dag_set)) @@ -123,7 +123,7 @@ class ConcurrencyCheckOperator(BaseOperator): """ conflict_string = '{} conflicts with running {}. Aborting run'.format( dag_name, conflict) - logging.warning(conflict_string) + logging.error(conflict_string) raise AirflowException(conflict_string) diff --git a/shipyard_airflow/plugins/deckhand_base_operator.py b/shipyard_airflow/plugins/deckhand_base_operator.py index 60d4cca3..53b703db 100644 --- a/shipyard_airflow/plugins/deckhand_base_operator.py +++ b/shipyard_airflow/plugins/deckhand_base_operator.py @@ -23,6 +23,7 @@ from airflow.exceptions import AirflowException from deckhand.client import client as deckhand_client from service_endpoint import ucp_service_endpoint from service_token import shipyard_service_token +from xcom_puller import XcomPuller class DeckhandBaseOperator(BaseOperator): @@ -49,7 +50,6 @@ class DeckhandBaseOperator(BaseOperator): svc_session=None, svc_token=None, validation_read_timeout=None, - workflow_info={}, xcom_push=True, *args, **kwargs): """Initialization of DeckhandBaseOperator object. @@ -66,7 +66,6 @@ class DeckhandBaseOperator(BaseOperator): :param svc_session: Keystone Session :param svc_token: Keystone Token :param validation_read_timeout: Deckhand validation timeout - :param workflow_info: Information related to current workflow :param xcom_push: xcom usage """ @@ -84,7 +83,6 @@ class DeckhandBaseOperator(BaseOperator): self.svc_session = svc_session self.svc_token = svc_token self.validation_read_timeout = validation_read_timeout - self.workflow_info = workflow_info self.xcom_push_flag = xcom_push def execute(self, context): @@ -117,17 +115,13 @@ class DeckhandBaseOperator(BaseOperator): # Define task_instance task_instance = context['task_instance'] - # Extract information related to current workflow - # The workflow_info variable will be a dictionary - # that contains information about the workflow such - # as action_id, name and other related parameters - self.workflow_info = task_instance.xcom_pull( - task_ids='action_xcom', key='action', - dag_id=self.main_dag_name) + # Set up and retrieve values from xcom + self.xcom_puller = XcomPuller(self.main_dag_name, task_instance) + self.action_info = self.xcom_puller.get_action_info() # Logs uuid of Shipyard action logging.info("Executing Shipyard Action %s", - self.workflow_info['id']) + self.action_info['id']) # Retrieve Endpoint Information self.deckhand_svc_endpoint = ucp_service_endpoint( @@ -158,9 +152,7 @@ class DeckhandBaseOperator(BaseOperator): if self.task_id != 'deckhand_get_design_version': # Retrieve 'revision_id' from xcom - self.revision_id = task_instance.xcom_pull( - task_ids='deckhand_get_design_version', - dag_id=self.main_dag_name + '.deckhand_get_design_version') + self.revision_id = self.xcom_puller.get_design_version() if self.revision_id: logging.info("Revision ID is %d", self.revision_id) diff --git a/shipyard_airflow/plugins/deckhand_client_factory.py b/shipyard_airflow/plugins/deckhand_client_factory.py new file mode 100644 index 00000000..3efa8c8c --- /dev/null +++ b/shipyard_airflow/plugins/deckhand_client_factory.py @@ -0,0 +1,66 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import configparser +import logging + +from keystoneauth1.identity import v3 as keystone_v3 +from keystoneauth1 import session as keystone_session + +from deckhand.client import client as deckhand_client + +LOG = logging.getLogger(__name__) + + +class DeckhandClientFactory(object): + """Factory for DeckhandClient to encapsulate commonly reused setup""" + + def __init__(self, + shipyard_conf, + *args, **kwargs): + """Deckhand Client Factory + + Creates a client factory to retrieve clients + :param shipyard_conf: Location of shipyard.conf + """ + self.config = configparser.ConfigParser() + self.config.read(shipyard_conf) + + def get_client(self): + """Retrieve a deckhand client""" + + """ + Notes: + TODO(bryan-strassner): If/when the airflow plugin modules move to using + oslo config, consider using the example here: + https://github.com/att-comdev/deckhand/blob/cef3b52a104e620e88a24caf70ed2bb1297c268f/deckhand/barbican/client_wrapper.py#L53 + which will load the attributes from the config more flexibly. + Keystoneauth1 also provides for a simpler solution with: + https://docs.openstack.org/keystoneauth/latest/api/keystoneauth1.loading.html + if oslo config is used. + """ + keystone_auth = {} + # Construct Session Argument + for attr in ('auth_url', 'password', 'project_domain_name', + 'project_name', 'username', 'user_domain_name'): + keystone_auth[attr] = self.config.get('keystone_authtoken', attr) + + # Set up keystone session + auth = keystone_v3.Password(**keystone_auth) + sess = keystone_session.Session(auth=auth) + + LOG.info("Setting up Deckhand client with parameters") + for attr in keystone_auth: + if attr != 'password': + LOG.debug('%s = %s', attr, keystone_auth[attr]) + return deckhand_client.Client(session=sess, endpoint_type='internal') diff --git a/shipyard_airflow/plugins/deployment_configuration_operator.py b/shipyard_airflow/plugins/deployment_configuration_operator.py new file mode 100644 index 00000000..5e15d924 --- /dev/null +++ b/shipyard_airflow/plugins/deployment_configuration_operator.py @@ -0,0 +1,178 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Deployment Configuration + +Retrieves the deployment configuration from Deckhand and places the values +retrieved into a dictionary +""" +import logging + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.plugins_manager import AirflowPlugin +from airflow.utils.decorators import apply_defaults + +try: + from deckhand_client_factory import DeckhandClientFactory +except ImportError: + from shipyard_airflow.plugins.deckhand_client_factory import ( + DeckhandClientFactory + ) + +LOG = logging.getLogger(__name__) + + +class DeploymentConfigurationOperator(BaseOperator): + """Deployment Configuration Operator + + Retrieve the deployment configuration from Deckhand for use throughout + the workflow. Put the configuration into a dictionary. + + Failures are raised: + - when Deckhand cannot be contacted + - when the DeploymentConfiguration (deployment-configuration) document + cannot be retrieved + """ + config_keys_defaults = { + "physical_provisioner.deployment_strategy": "all-at-once", + "physical_provisioner.deploy_interval": 30, + "physical_provisioner.deploy_timeout": 3600, + "physical_provisioner.destroy_interval": 30, + "physical_provisioner.destroy_timeout": 900, + "physical_provisioner.join_wait": 120, + "physical_provisioner.prepare_node_interval": 30, + "physical_provisioner.prepare_node_timeout": 1000, + "physical_provisioner.prepare_site_interval": 10, + "physical_provisioner.prepare_site_timeout": 300, + "physical_provisioner.verify_interval": 10, + "physical_provisioner.verify_timeout": 60, + "kubernetes.node_status_interval": 30, + "kubernetes.node_status_timeout": 1800, + "kubernetes_provisioner.drain_timeout": 3600, + "kubernetes_provisioner.drain_grace_period": 1800, + "kubernetes_provisioner.clear_labels_timeout": 1800, + "kubernetes_provisioner.remove_etcd_timeout": 1800, + "kubernetes_provisioner.etcd_ready_timeout": 600, + "armada.manifest": "full-site" + } + + @apply_defaults + def __init__(self, + main_dag_name=None, + shipyard_conf=None, + *args, **kwargs): + """Deployment Configuration Operator + + Generate a DeploymentConfigurationOperator to read the deployment's + configuration for use by other operators + + :param main_dag_name: Parent Dag + :param shipyard_conf: Location of shipyard.conf + """ + + super(DeploymentConfigurationOperator, self).__init__(*args, **kwargs) + self.main_dag_name = main_dag_name + self.shipyard_conf = shipyard_conf + + def execute(self, context): + """Perform Deployment Configuration extraction""" + + revision_id = self.get_revision_id(context.get('task_instance')) + doc = self.get_doc(revision_id) + converted = self.map_config_keys(doc) + # return the mapped configuration so that it can be placed on xcom + return converted + + def get_revision_id(self, task_instance): + """Get the revision id from xcom""" + if task_instance: + LOG.debug("task_instance found, extracting design version") + # Set the revision_id to the revision on the xcom + revision_id = task_instance.xcom_pull( + task_ids='deckhand_get_design_version', + dag_id=self.main_dag_name + '.deckhand_get_design_version') + if revision_id: + LOG.info("Revision is set to: %s for deployment configuration", + revision_id) + return revision_id + # either revision id was not on xcom, or the task_instance is messed + raise AirflowException( + "Design_revision is not set. Cannot proceed with retrieval of" + " the design configuration" + ) + + def get_doc(self, revision_id): + """Get the DeploymentConfiguration document dictionary from Deckhand""" + LOG.info( + "Attempting to retrieve shipyard/DeploymentConfiguration/v1, " + "deployment-configuration from Deckhand" + ) + filters = { + "schema": "shipyard/DeploymentConfiguration/v1", + "metadata.name": "deployment-configuration" + } + try: + dhclient = DeckhandClientFactory(self.shipyard_conf).get_client() + LOG.info("Deckhand Client acquired") + doc = dhclient.revisions.documents(revision_id, + rendered=True, + **filters) + except Exception as ex: + try: + failed_url = ex.url + except AttributeError: + failed_url = "No URL generated" + LOG.exception(ex) + raise AirflowException("Failed to retrieve deployment " + "configuration yaml using url: " + "{}".format(failed_url)) + + if len(doc) == 1 and doc[0].data: + doc_dict = doc[0].data + else: + raise AirflowException("A valid deployment-configuration is " + "required") + + LOG.info("DeploymentConfiguration retrieved") + return doc_dict + + def map_config_keys(self, cfg_data): + """Maps the deployment-configuration + + Converts to a more simple map of key-value pairs + """ + LOG.info("Mapping keys from deployment configuration") + return { + cfg_key: self.get_cfg_value(cfg_data, cfg_key, cfg_default) + for cfg_key, cfg_default in + DeploymentConfigurationOperator.config_keys_defaults.items() + } + + def get_cfg_value(self, cfg_data, cfg_key, cfg_default): + """Uses the dot notation key to get the value from the design config""" + data = cfg_data + for node in cfg_key.split('.'): + data = data.get(node, {}) + if data: + LOG.info("Deployment Config value set- %s: %s", cfg_key, data) + return data + else: + LOG.info("Deployment Config using default- %s: %s", + cfg_key, cfg_default) + return cfg_default + + +class DeploymentConfigurationOperatorPlugin(AirflowPlugin): + name = 'deployment_configuration_operator_plugin' + operators = [DeploymentConfigurationOperator] diff --git a/shipyard_airflow/plugins/drydock_operators.py b/shipyard_airflow/plugins/drydock_operators.py index c2188f40..c87c531b 100644 --- a/shipyard_airflow/plugins/drydock_operators.py +++ b/shipyard_airflow/plugins/drydock_operators.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import configparser import json import logging import os @@ -31,20 +30,11 @@ from check_k8s_node_status import check_node_status from drydock_provisioner import error as errors from service_endpoint import ucp_service_endpoint from service_token import shipyard_service_token +from xcom_puller import XcomPuller class DryDockOperator(BaseOperator): - """ - DryDock Client - :param action: Task to perform - :param design_ref: A URI reference to the design documents - :param main_dag_name: Parent Dag - :param node_filter: A filter for narrowing the scope of the task. Valid - fields are 'node_names', 'rack_names', 'node_tags' - :param shipyard_conf: Location of shipyard.conf - :param sub_dag_name: Child Dag - :param workflow_info: Information related to the workflow - """ + """DryDock Client""" @apply_defaults def __init__(self, action=None, @@ -54,9 +44,20 @@ class DryDockOperator(BaseOperator): shipyard_conf=None, svc_token=None, sub_dag_name=None, - workflow_info={}, xcom_push=True, *args, **kwargs): + """ + :param action: Task to perform + :param design_ref: A URI reference to the design documents + :param main_dag_name: Parent Dag + :param node_filter: A filter for narrowing the scope of the task. Valid + fields are 'node_names', 'rack_names', 'node_tags' + :param shipyard_conf: Location of shipyard.conf + :param sub_dag_name: Child Dag + + The Drydock operator assumes that prior steps have set xcoms for + the action and the deployment configuration + """ super(DryDockOperator, self).__init__(*args, **kwargs) self.action = action @@ -66,7 +67,6 @@ class DryDockOperator(BaseOperator): self.shipyard_conf = shipyard_conf self.svc_token = svc_token self.sub_dag_name = sub_dag_name - self.workflow_info = workflow_info self.xcom_push_flag = xcom_push def execute(self, context): @@ -81,22 +81,19 @@ class DryDockOperator(BaseOperator): # Define task_instance task_instance = context['task_instance'] - # Extract information related to current workflow - # The workflow_info variable will be a dictionary - # that contains information about the workflow such - # as action_id, name and other related parameters - workflow_info = task_instance.xcom_pull( - task_ids='action_xcom', key='action', - dag_id=self.main_dag_name) + # Set up and retrieve values from xcom + self.xcom_puller = XcomPuller(self.main_dag_name, task_instance) + self.action_info = self.xcom_puller.get_action_info() + self.dc = self.xcom_puller.get_deployment_configuration() # Logs uuid of action performed by the Operator - logging.info("DryDock Operator for action %s", workflow_info['id']) + logging.info("DryDock Operator for action %s", self.action_info['id']) # Retrieve information of the server that we want to redeploy if user # executes the 'redeploy_server' dag # Set node filter to be the server that we want to redeploy - if workflow_info['dag_id'] == 'redeploy_server': - redeploy_server = workflow_info['parameters'].get('server-name') + if self.action_info['dag_id'] == 'redeploy_server': + redeploy_server = self.action_info['parameters'].get('server-name') if redeploy_server: logging.info("Server to be redeployed is %s", redeploy_server) @@ -139,83 +136,56 @@ class DryDockOperator(BaseOperator): # Set up DryDock Client drydock_client = self.drydock_session_client(drydock_svc_endpoint) - # Read shipyard.conf - config = configparser.ConfigParser() - config.read(self.shipyard_conf) - - if not config.read(self.shipyard_conf): - raise AirflowException("Unable to read content of shipyard.conf") - # Create Task for verify_site if self.action == 'verify_site': - - # Default settings for 'verify_site' execution is to query - # the task every 10 seconds and to time out after 60 seconds - query_interval = config.get('drydock', - 'verify_site_query_interval') - task_timeout = config.get('drydock', 'verify_site_task_timeout') - + q_interval = self.dc['physical_provisioner.verify_interval'] + task_timeout = self.dc['physical_provisioner.verify_timeout'] self.drydock_action(drydock_client, context, self.action, - query_interval, task_timeout) + q_interval, task_timeout) # Create Task for prepare_site elif self.action == 'prepare_site': - # Default settings for 'prepare_site' execution is to query - # the task every 10 seconds and to time out after 300 seconds - query_interval = config.get('drydock', - 'prepare_site_query_interval') - task_timeout = config.get('drydock', 'prepare_site_task_timeout') - + q_interval = self.dc['physical_provisioner.prepare_site_interval'] + task_timeout = self.dc['physical_provisioner.prepare_site_timeout'] self.drydock_action(drydock_client, context, self.action, - query_interval, task_timeout) + q_interval, task_timeout) # Create Task for prepare_node elif self.action == 'prepare_nodes': - # Default settings for 'prepare_node' execution is to query - # the task every 30 seconds and to time out after 1800 seconds - query_interval = config.get('drydock', - 'prepare_node_query_interval') - task_timeout = config.get('drydock', 'prepare_node_task_timeout') - + q_interval = self.dc['physical_provisioner.prepare_node_interval'] + task_timeout = self.dc['physical_provisioner.prepare_node_timeout'] self.drydock_action(drydock_client, context, self.action, - query_interval, task_timeout) + q_interval, task_timeout) # Create Task for deploy_node elif self.action == 'deploy_nodes': - # Default settings for 'deploy_node' execution is to query - # the task every 30 seconds and to time out after 3600 seconds - query_interval = config.get('drydock', - 'deploy_node_query_interval') - task_timeout = config.get('drydock', 'deploy_node_task_timeout') - + q_interval = self.dc['physical_provisioner.deploy_interval'] + task_timeout = self.dc['physical_provisioner.deploy_timeout'] self.drydock_action(drydock_client, context, self.action, - query_interval, task_timeout) + q_interval, task_timeout) # Wait for 120 seconds (default value) before checking the cluster # join process as it takes time for process to be triggered across # all nodes - cluster_join_check_backoff_time = config.get( - 'drydock', 'cluster_join_check_backoff_time') + join_wait = self.dc['physical_provisioner.join_wait'] logging.info("All nodes deployed in MAAS") logging.info("Wait for %d seconds before checking node state...", - int(cluster_join_check_backoff_time)) - time.sleep(int(cluster_join_check_backoff_time)) - + join_wait) + time.sleep(join_wait) # Check that cluster join process is completed before declaring - # deploy_node as 'completed'. Set time out to 30 minutes and set - # polling interval to 30 seconds. - check_node_status(1800, 30) + # deploy_node as 'completed'. + node_st_timeout = self.dc['kubernetes.node_status_timeout'] + node_st_interval = self.dc['kubernetes.node_status_interval'] + check_node_status(node_st_timeout, node_st_interval) # Create Task for destroy_node # NOTE: This is a PlaceHolder function. The 'destroy_node' # functionalities in DryDock is being worked on and is not # ready at the moment. elif self.action == 'destroy_node': - # Default settings for 'destroy_node' execution is to query - # the task every 30 seconds and to time out after 900 seconds - query_interval = config.get('drydock', - 'destroy_node_query_interval') - task_timeout = config.get('drydock', 'destroy_node_task_timeout') + # see deployment_configuration_operator.py for defaults + q_interval = self.dc['physical_provisioner.destroy_interval'] + task_timeout = self.dc['physical_provisioner.destroy_timeout'] logging.info("Destroying node %s from cluster...", redeploy_server) time.sleep(15) @@ -224,7 +194,7 @@ class DryDockOperator(BaseOperator): # TODO: Uncomment when the function to destroy/delete node is # ready for consumption in Drydock # self.drydock_action(drydock_client, context, self.action, - # query_interval, task_timeout) + # q_interval, task_timeout) # Do not perform any action else: @@ -403,15 +373,7 @@ class DryDockOperator(BaseOperator): svc_type=svc_type) logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint) - # Retrieve revision_id from xcom - # Note that in the case of 'deploy_site', the dag_id will - # be 'deploy_site.deckhand_get_design_version' for the - # 'deckhand_get_design_version' task. We need to extract - # the xcom value from it in order to get the value of the - # last committed revision ID - committed_revision_id = context['task_instance'].xcom_pull( - task_ids='deckhand_get_design_version', - dag_id=self.main_dag_name + '.deckhand_get_design_version') + committed_revision_id = self.xcom_puller.get_design_version() # Form DeckHand Design Reference Path that we will use to retrieve # the DryDock YAMLs diff --git a/shipyard_airflow/plugins/promenade_base_operator.py b/shipyard_airflow/plugins/promenade_base_operator.py index 39637a0e..1389f9e1 100644 --- a/shipyard_airflow/plugins/promenade_base_operator.py +++ b/shipyard_airflow/plugins/promenade_base_operator.py @@ -21,6 +21,7 @@ from airflow.exceptions import AirflowException from service_endpoint import ucp_service_endpoint from service_token import shipyard_service_token +from xcom_puller import XcomPuller class PromenadeBaseOperator(BaseOperator): @@ -30,7 +31,6 @@ class PromenadeBaseOperator(BaseOperator): All promenade related workflow operators will use the promenade base operator as the parent and inherit attributes and methods from this class - """ @apply_defaults @@ -42,7 +42,6 @@ class PromenadeBaseOperator(BaseOperator): shipyard_conf=None, sub_dag_name=None, svc_token=None, - workflow_info={}, xcom_push=True, *args, **kwargs): """Initialization of PromenadeBaseOperator object. @@ -54,9 +53,9 @@ class PromenadeBaseOperator(BaseOperator): :param shipyard_conf: Path of shipyard.conf :param sub_dag_name: Child Dag :param svc_token: Keystone Token - :param workflow_info: Information related to current workflow :param xcom_push: xcom usage - + The Drydock operator assumes that prior steps have set xcoms for + the action and the deployment configuration """ super(PromenadeBaseOperator, self).__init__(*args, @@ -68,11 +67,9 @@ class PromenadeBaseOperator(BaseOperator): self.shipyard_conf = shipyard_conf self.sub_dag_name = sub_dag_name self.svc_token = svc_token - self.workflow_info = workflow_info self.xcom_push_flag = xcom_push def execute(self, context): - # Execute promenade base function self.promenade_base(context) @@ -84,22 +81,18 @@ class PromenadeBaseOperator(BaseOperator): # Define task_instance task_instance = context['task_instance'] - # Extract information related to current workflow - # The workflow_info variable will be a dictionary - # that contains information about the workflow such - # as action_id, name and other related parameters - self.workflow_info = task_instance.xcom_pull( - task_ids='action_xcom', key='action', - dag_id=self.main_dag_name) + # Set up and retrieve values from xcom + self.xcom_puller = XcomPuller(self.main_dag_name, task_instance) + self.action_info = self.xcom_puller.get_action_info() + self.dc = self.xcom_puller.get_deployment_configuration() # Logs uuid of Shipyard action - logging.info("Executing Shipyard Action %s", - self.workflow_info['id']) + logging.info("Executing Shipyard Action %s", self.action_info['id']) # Retrieve information of the server that we want to redeploy # if user executes the 'redeploy_server' dag - if self.workflow_info['dag_id'] == 'redeploy_server': - self.redeploy_server = self.workflow_info['parameters'].get( + if self.action_info['dag_id'] == 'redeploy_server': + self.redeploy_server = self.action_info['parameters'].get( 'server-name') if self.redeploy_server: diff --git a/shipyard_airflow/plugins/promenade_check_etcd.py b/shipyard_airflow/plugins/promenade_check_etcd.py index d4f5e200..f0a406ac 100644 --- a/shipyard_airflow/plugins/promenade_check_etcd.py +++ b/shipyard_airflow/plugins/promenade_check_etcd.py @@ -33,6 +33,10 @@ class PromenadeCheckEtcdOperator(PromenadeBaseOperator): def do_execute(self): # Placeholder function. Updates will be made when the Promenade # API is ready for consumption. + + # TODO(bryan-strassner) use: + # self.dc['kubernetes_provisioner.etcd_ready_timeout'] + # self.dc['kubernetes_provisioner.remove_etcd_timeout'] logging.info("Performing health check on etcd...") time.sleep(5) diff --git a/shipyard_airflow/plugins/promenade_clear_labels.py b/shipyard_airflow/plugins/promenade_clear_labels.py index 526f8b78..a78a9f8d 100644 --- a/shipyard_airflow/plugins/promenade_clear_labels.py +++ b/shipyard_airflow/plugins/promenade_clear_labels.py @@ -33,6 +33,10 @@ class PromenadeClearLabelsOperator(PromenadeBaseOperator): def do_execute(self): # Placeholder function. Updates will be made when the Promenade # API is ready for consumption. + + # TODO(bryan-strassner) use: + # self.dc['kubernetes_provisioner.clear_labels_timeout'] + logging.info("Removing labels on node...") time.sleep(5) diff --git a/shipyard_airflow/plugins/promenade_drain_node.py b/shipyard_airflow/plugins/promenade_drain_node.py index 7fea0c28..1822bdf7 100644 --- a/shipyard_airflow/plugins/promenade_drain_node.py +++ b/shipyard_airflow/plugins/promenade_drain_node.py @@ -35,6 +35,11 @@ class PromenadeDrainNodeOperator(PromenadeBaseOperator): def do_execute(self): # Placeholder function. Updates will be made when the Promenade # API is ready for consumption. + + # TODO(bryan-strassner) use: + # self.dc['kubernetes_provisioner.drain_timeout'] + # self.dc['kubernetes_provisioner.drain_grace_period'] + logging.info("Draining node...") time.sleep(5) diff --git a/shipyard_airflow/plugins/xcom_puller.py b/shipyard_airflow/plugins/xcom_puller.py new file mode 100644 index 00000000..5b3ddef9 --- /dev/null +++ b/shipyard_airflow/plugins/xcom_puller.py @@ -0,0 +1,84 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +LOG = logging.getLogger(__name__) + + +class XcomPuller(object): + """XcomPuller provides a common source to get reused xcom values + + One XcomPuller should be created per task. + Note: xcom values are found by using the current task instance + and finding the . that the xcom was added + to the workflow. + The point of this class is to keep all this very configurable + naming in one place as much as possible so that changes to + the dag names and step names have less places to update. + """ + + def __init__(self, main_dag_name, task_instance): + self.mdn = main_dag_name + self.ti = task_instance + + def _get_xcom(self, source_task, dag_id=None, key=None, log_result=True): + """Find a particular xcom value""" + if dag_id is None: + source_dag = self.mdn + else: + source_dag = "{}.{}".format(self.mdn, dag_id) + LOG.info("Retrieving xcom from %s.%s with key %s", + source_dag, + source_task, + key) + xcom_val = self.ti.xcom_pull(task_ids=source_task, + dag_id=source_dag, + key=key) + if log_result: + # log the xcom value - don't put large values in xcom! + LOG.info(xcom_val) + + return xcom_val + + def get_deployment_configuration(self): + """Retrieve the deployment configuration dictionary""" + source_task = 'get_deployment_configuration' + source_dag = 'dag_deployment_configuration' + key = None + return self._get_xcom(source_task=source_task, + dag_id=source_dag, + key=key) + + def get_action_info(self): + """Retrive the action and action parameter info dictionary + + Extract information related to current workflow. This is a dictionary + that contains information about the workflow such as action_id, name + and other related parameters + """ + source_task = 'action_xcom' + source_dag = None + key = 'action' + return self._get_xcom(source_task=source_task, + dag_id=source_dag, + key=key) + + def get_design_version(self): + """Retrieve the design version being used for this workflow""" + source_task = 'deckhand_get_design_version' + source_dag = 'deckhand_get_design_version' + key = None + return self._get_xcom(source_task=source_task, + dag_id=source_dag, + key=key) diff --git a/shipyard_airflow/schemas/deploymentConfiguration.yaml b/shipyard_airflow/schemas/deploymentConfiguration.yaml new file mode 100644 index 00000000..ce6ae1aa --- /dev/null +++ b/shipyard_airflow/schemas/deploymentConfiguration.yaml @@ -0,0 +1,76 @@ + +--- +schema: 'deckhand/DataSchema/v1' +metadata: + schema: metadata/Control/v1 + name: shipyard/DeploymentConfiguration/v1 + labels: + application: shipyard +data: + $schema: 'http://json-schema.org/schema#' + id: 'https://github.com/att-comdev/shipyard/deploymentConfiguration.yaml' + type: 'object' + properties: + physical_provisioner: + type: 'object' + properties: + deployment_strategy: + type: 'string' + enum: + - 'all-at-once' + deploy_interval: + type: 'integer' + deploy_timeout: + type: 'integer' + destroy_interval: + type: 'integer' + destroy_timeout: + type: 'integer' + join_wait: + type: 'integer' + prepare_node_interval: + type: 'integer' + prepare_node_timeout: + type: 'integer' + prepare_site_interval: + type: 'integer' + prepare_site_timeout: + type: 'integer' + verify_interval: + type: 'integer' + verify_timeout: + type: 'integer' + additionalProperties: false + kubernetes: + type: 'object' + properties: + node_status_interval: + type: 'integer' + node_status_timeout: + type: 'integer' + additionalProperties: false + kubernetes_provisioner: + type: 'object' + properties: + drain_timeout: + type: 'integer' + drain_grace_period: + type: 'integer' + clear_labels_timeout: + type: 'integer' + remove_etcd_timeout: + type: 'integer' + etcd_ready_timeout: + type: 'integer' + additionalProperties: false + armada: + type: 'object' + properties: + manifest: + type: 'string' + additionalProperties: false + required: + - manifest + additionalProperties: false + required: + - armada diff --git a/test-requirements.txt b/test-requirements.txt index 04a07f7f..8c2f45df 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -5,6 +5,10 @@ mock==2.0.0 responses==0.8.1 testfixtures==5.1.1 apache-airflow[crypto,celery,postgres,hive,hdfs,jdbc]==1.9.0 +git+https://github.com/att-comdev/deckhand.git@master#egg=deckhand +git+https://github.com/att-comdev/promenade.git@master#egg=promenade +git+https://github.com/att-comdev/drydock.git@master#egg=drydock +git+https://github.com/att-comdev/armada.git@master#egg=armada # Linting flake8==3.3.0 diff --git a/tests/unit/control/test.conf b/tests/unit/control/test.conf index d9324273..93d1327c 100644 --- a/tests/unit/control/test.conf +++ b/tests/unit/control/test.conf @@ -5,6 +5,8 @@ upgrade_db = false postgresql_airflow_db = postgresql+psycopg2://airflow:password@postgresql.ucp.svc.cluster.local:5432/airflow postgresql_db = postgresql+psycopg2://shipyard:password@postgresql.ucp.svc.cluster.local:5432/shipyard web_server = http://airflow-web-int.ucp.svc.cluster.local:8080/ +airflow_api_connect_timeout = 5 +airflow_api_read_timeout = 60 [deckhand] service_type = deckhand [drydock] diff --git a/tests/unit/plugins/test.conf b/tests/unit/plugins/test.conf new file mode 100644 index 00000000..2f223440 --- /dev/null +++ b/tests/unit/plugins/test.conf @@ -0,0 +1,15 @@ +[keystone_authtoken] +auth_section = keystone_authtoken +auth_type = password +auth_uri = http://keystone-api.ucp.svc.cluster.local:80/v3 +auth_url = http://keystone-api.ucp.svc.cluster.local:80/v3 +auth_version = v3 +delay_auth_decision = true +memcache_secret_key = zwe6wa59AykCCMk4ucOwEbAkmLSXLOYRharO39FYHY0WYlQnxMwTIJna6NBzJskm +memcache_security_strategy = None +memcached_servers = memcached.ucp.svc.cluster.local:11211 +password = password +project_domain_name = default +project_name = service +user_domain_name = default +username = shipyard diff --git a/tests/unit/plugins/test_deckhand_client_factory.py b/tests/unit/plugins/test_deckhand_client_factory.py new file mode 100644 index 00000000..044f4cc7 --- /dev/null +++ b/tests/unit/plugins/test_deckhand_client_factory.py @@ -0,0 +1,29 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os + +from deckhand.client import client as deckhand_client + +from shipyard_airflow.plugins.deckhand_client_factory import ( + DeckhandClientFactory +) + + +def test_get_client(): + """Test the get_client functionality""" + cur_dir = os.path.dirname(__file__) + filename = os.path.join(cur_dir, 'test.conf') + client_factory = DeckhandClientFactory(filename) + client = client_factory.get_client() + assert isinstance(client, deckhand_client.Client) diff --git a/tests/unit/plugins/test_deployment_configuration_operator.py b/tests/unit/plugins/test_deployment_configuration_operator.py new file mode 100644 index 00000000..24c55aa6 --- /dev/null +++ b/tests/unit/plugins/test_deployment_configuration_operator.py @@ -0,0 +1,158 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import mock +import pytest +import yaml + +import airflow +from airflow.exceptions import AirflowException + +try: + from deployment_configuration_operator import ( + DeploymentConfigurationOperator + ) +except ImportError: + from shipyard_airflow.plugins.deployment_configuration_operator import ( + DeploymentConfigurationOperator + ) + +try: + from deckhand_client_factory import DeckhandClientFactory +except ImportError: + from shipyard_airflow.plugins.deckhand_client_factory import ( + DeckhandClientFactory + ) + + +def test_execute_exception(): + """Test that execute results in a failure with bad context""" + + dco = DeploymentConfigurationOperator(main_dag_name="main", + shipyard_conf="shipyard.conf", + task_id="t1") + with pytest.raises(AirflowException) as expected_exc: + # Design revision is not set on xcom pull + dco.execute(context={}) + assert ("Design_revision is not set. Cannot proceed with retrieval" + " of the design configuration") in str(expected_exc) + + +@mock.patch.object(DeploymentConfigurationOperator, 'get_revision_id', + return_value=99) +def test_execute_no_client(p1): + # no keystone authtoken present in configuration + dco = DeploymentConfigurationOperator(main_dag_name="main", + shipyard_conf="shipyard.conf", + task_id="t1") + with pytest.raises(AirflowException) as expected_exc: + dco.execute(context={}) + assert ("Failed to retrieve deployment configuration yaml") in str( + expected_exc) + + +@mock.patch.object(airflow.models.TaskInstance, 'xcom_pull', + return_value=99) +def test_get_revision_id(ti): + """Test that get revision id follows desired exits""" + dco = DeploymentConfigurationOperator(main_dag_name="main", + shipyard_conf="shipyard.conf", + task_id="t1") + ti = airflow.models.TaskInstance(task=mock.MagicMock(), + execution_date="no") + rid = dco.get_revision_id(ti) + assert rid == 99 + + +@mock.patch.object(airflow.models.TaskInstance, 'xcom_pull', + return_value=None) +def test_get_revision_id_none(ti): + """Test that get revision id follows desired exits""" + dco = DeploymentConfigurationOperator(main_dag_name="main", + shipyard_conf="shipyard.conf", + task_id="t1") + ti = airflow.models.TaskInstance(task=mock.MagicMock(), execution_date="o") + with pytest.raises(AirflowException) as expected_exc: + rid = dco.get_revision_id(ti) + assert "Design_revision is not set." in str(expected_exc) + + +def test_get_doc_no_deckhand(): + """Get doc should fail to contact deckhand return a document""" + dco = DeploymentConfigurationOperator(main_dag_name="main", + shipyard_conf="shipyard.conf", + task_id="t1") + with pytest.raises(AirflowException) as expected_exc: + dco.get_doc(99) + assert "Failed to retrieve deployment" in str(expected_exc) + + +def get_m_client(data): + doc_obj = mock.MagicMock() + doc_obj.data = data + doc_obj_l = [doc_obj] + mock_client = mock.MagicMock() + mock_client.revisions.documents = lambda r, rendered, **filters: doc_obj_l + return mock_client + + +@mock.patch.object(DeckhandClientFactory, 'get_client', + return_value=get_m_client('abcdefg')) +def test_get_doc_mock_deckhand(p1): + """Get doc should return a document""" + dco = DeploymentConfigurationOperator(main_dag_name="main", + shipyard_conf="shipyard.conf", + task_id="t1") + + doc = dco.get_doc(99) + assert doc == 'abcdefg' + + +@mock.patch.object(DeckhandClientFactory, 'get_client', + return_value=get_m_client(None)) +def test_get_doc_mock_deckhand_invalid(p1): + """Get doc should return a document""" + dco = DeploymentConfigurationOperator(main_dag_name="main", + shipyard_conf="shipyard.conf", + task_id="t1") + + with pytest.raises(AirflowException) as airflow_ex: + dco.get_doc(99) + assert 'valid deployment-configuration' in str(airflow_ex) + + +sample_deployment_config = """ + physical_provisioner: + deployment_strategy: all-at-once + deploy_interval: 900 + kubernetes_provisioner: + drain_timeout: 3600 + drain_grace_period: 1800 + clear_labels_timeout: 1800 + remove_etcd_timeout: 1800 + etcd_ready_timeout: 600 + armada: + manifest: 'full-site'""" + + +def test_map_config_keys(): + """Should reutrn the new dict from the yaml dict""" + yaml_dict = yaml.safe_load(sample_deployment_config) + dco = DeploymentConfigurationOperator(main_dag_name="main", + shipyard_conf="shipyard.conf", + task_id="t1") + mapped = dco.map_config_keys(yaml_dict) + for key in DeploymentConfigurationOperator.config_keys_defaults: + assert key in mapped + assert mapped.get("physical_provisioner.deploy_interval") == 900 + assert mapped.get("physical_provisioner.verify_timeout") == 60 diff --git a/tests/unit/schemas/__init__.py b/tests/unit/schemas/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/schemas/test_deployment_configuration.py b/tests/unit/schemas/test_deployment_configuration.py new file mode 100644 index 00000000..2700ad5f --- /dev/null +++ b/tests/unit/schemas/test_deployment_configuration.py @@ -0,0 +1,78 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import os +import yaml + +import jsonschema +import pkg_resources +import pytest +import shutil + +from jsonschema.exceptions import ValidationError + +LOG = logging.getLogger(__name__) + + +class BaseSchemaValidationTest(object): + def _test_validate(self, schema, expect_failure, input_files, input): + """validates input yaml against schema. + :param schema: schema yaml file + :param expect_failure: should the validation pass or fail. + :param input_files: pytest fixture used to access the test input files + :param input: test input yaml doc filename""" + schema_dir = pkg_resources.resource_filename('shipyard_airflow', + 'schemas') + schema_filename = os.path.join(schema_dir, schema) + schema_file = open(schema_filename, 'r') + schema = yaml.safe_load(schema_file) + + input_file = input_files.join(input) + instance_file = open(str(input_file), 'r') + instance = yaml.safe_load(instance_file) + + LOG.info('Input: %s, Schema: %s', input_file, schema_filename) + + if expect_failure: + with pytest.raises(ValidationError): + jsonschema.validate(instance['data'], schema['data']) + else: + jsonschema.validate(instance['data'], schema['data']) + + +class TestValidation(BaseSchemaValidationTest): + def test_validate_deploy_config_full_valid(self, input_files): + self._test_validate('deploymentConfiguration.yaml', False, input_files, + 'deploymentConfiguration_full_valid.yaml') + + def test_validate_deploy_config_bad_manifest(self, input_files): + self._test_validate('deploymentConfiguration.yaml', True, input_files, + 'deploymentConfiguration_bad_manifest.yaml') + + def test_validate_deploy_config_minimal_valid(self, input_files): + self._test_validate('deploymentConfiguration.yaml', False, input_files, + 'deploymentConfiguration_minimal_valid.yaml') + + @pytest.fixture(scope='module') + def input_files(self, tmpdir_factory, request): + tmpdir = tmpdir_factory.mktemp('data') + samples_dir = os.path.dirname(str( + request.fspath)) + "/" + "../yaml_samples" + samples = os.listdir(samples_dir) + + for f in samples: + src_file = samples_dir + "/" + f + dst_file = str(tmpdir) + "/" + f + shutil.copyfile(src_file, dst_file) + return tmpdir diff --git a/tests/unit/yaml_samples/deploymentConfiguration_bad_manifest.yaml b/tests/unit/yaml_samples/deploymentConfiguration_bad_manifest.yaml new file mode 100644 index 00000000..7e6d5c32 --- /dev/null +++ b/tests/unit/yaml_samples/deploymentConfiguration_bad_manifest.yaml @@ -0,0 +1,13 @@ +--- +schema: shipyard/DeploymentConfiguration/v1 +metadata: + schema: metadata/Document/v1 + name: deployment-configuration + layeringDefinition: + abstract: false + layer: global + storagePolicy: cleartext +data: + armada: + manifest: + bad_name_field: 'full-site' diff --git a/tests/unit/yaml_samples/deploymentConfiguration_full_valid.yaml b/tests/unit/yaml_samples/deploymentConfiguration_full_valid.yaml new file mode 100644 index 00000000..7501b172 --- /dev/null +++ b/tests/unit/yaml_samples/deploymentConfiguration_full_valid.yaml @@ -0,0 +1,31 @@ +--- +schema: shipyard/DeploymentConfiguration/v1 +metadata: + schema: metadata/Document/v1 + name: deployment-configuration + layeringDefinition: + abstract: false + layer: global + storagePolicy: cleartext +data: + physical_provisioner: + deployment_strategy: all-at-once + deploy_interval: 30 + deploy_timeout: 3600 + destroy_interval: 30 + destroy_timeout: 900 + join_wait: 120 + prepare_node_interval: 30 + prepare_node_timeout: 1000 + prepare_site_interval: 10 + prepare_site_timeout: 300 + verify_interval: 10 + verify_timeout: 60 + kubernetes_provisioner: + drain_timeout: 3600 + drain_grace_period: 1800 + clear_labels_timeout: 1800 + remove_etcd_timeout: 1800 + etcd_ready_timeout: 600 + armada: + manifest: 'full-site' diff --git a/tests/unit/yaml_samples/deploymentConfiguration_minimal_valid.yaml b/tests/unit/yaml_samples/deploymentConfiguration_minimal_valid.yaml new file mode 100644 index 00000000..438152e2 --- /dev/null +++ b/tests/unit/yaml_samples/deploymentConfiguration_minimal_valid.yaml @@ -0,0 +1,12 @@ +--- +schema: shipyard/DeploymentConfiguration/v1 +metadata: + schema: metadata/Document/v1 + name: deployment-configuration + layeringDefinition: + abstract: false + layer: global + storagePolicy: cleartext +data: + armada: + manifest: 'full-site' diff --git a/tox.ini b/tox.ini index f0b9cbcf..f57ef260 100644 --- a/tox.ini +++ b/tox.ini @@ -1,13 +1,15 @@ [tox] -envlist = py35, pep8, coverage, bandit, docs +envlist = unit, pep8, coverage, bandit, docs [testenv] -deps = -r{toxinidir}/requirements.txt - -r{toxinidir}/test-requirements.txt setenv= PYTHONWARNING=all basepython=python3.5 -commands= +deps = -r{toxinidir}/requirements.txt + -r{toxinidir}/test-requirements.txt + +[testenv:unit] +commands = pytest \ {posargs} @@ -33,7 +35,11 @@ filename = *.py # NOTE(Bryan Strassner) ignoring F841 because of the airflow example pattern # of naming variables even if they aren't used for DAGs and Operators. # Doing so adds readability and context in this case. -ignore = F841 +# TODO(Bryan Strassner) The hacking rules defined as ignored below in many +# cases need to be un-ignored and fixed up. These are ignored because of +# the method in which test requirements bring in the hacking rules from +# other projects. +ignore = F841, H101, H201, H210, H238, H301, H304, H306, H401, H403, H404, H405 # NOTE(Bryan Strassner) excluding 3rd party and generated code that is brought into the # codebase. exclude = .venv,.git,.tox,build,dist,*plugins/rest_api_plugin.py,*lib/python*,*egg,alembic/env.py,docs @@ -47,9 +53,10 @@ commands = [testenv:coverage] commands = pytest \ + {posargs} \ --cov-branch \ - --cov-report term-missing:skip-covered \ - --cov-config .coveragerc \ + --cov-report=term-missing:skip-covered \ + --cov-config=.coveragerc \ --cov=shipyard_airflow \ --cov=shipyard_client \ --cov-report=html