From 4c2a3a5a7593ddad4b02cd7d3a2afbe7404442de Mon Sep 17 00:00:00 2001 From: Bryan Strassner Date: Fri, 11 Aug 2017 12:59:54 -0500 Subject: [PATCH] Add deploy site DAG skeleton reformatted using yapf updated to import so airflow could find things updated to use the right parameters to subdags updated to remove promenade steps fix some trailing whitespace changed for review comments rename files to properly use underscores instead of dashes Change-Id: I5bd5b6d76f2f98afbaffa62ed7d4a4d756bc3dfc --- shipyard_airflow/__init__.py | 13 -- shipyard_airflow/dags/__init__.py | 0 .../dags/dag_concurrency_check.py | 53 ++++++ shipyard_airflow/dags/deploy_site.py | 143 +++++++++++++++ shipyard_airflow/dags/preflight_checks.py | 166 ++++++++++++++++++ shipyard_airflow/dags/validate_site_design.py | 58 ++++++ shipyard_airflow/plugins/__init__.py | 0 shipyard_airflow/plugins/deckhand_operator.py | 44 +++++ .../plugins/placeholder_operator.py | 42 +++++ 9 files changed, 506 insertions(+), 13 deletions(-) create mode 100644 shipyard_airflow/dags/__init__.py create mode 100644 shipyard_airflow/dags/dag_concurrency_check.py create mode 100644 shipyard_airflow/dags/deploy_site.py create mode 100644 shipyard_airflow/dags/preflight_checks.py create mode 100644 shipyard_airflow/dags/validate_site_design.py create mode 100644 shipyard_airflow/plugins/__init__.py create mode 100644 shipyard_airflow/plugins/deckhand_operator.py create mode 100644 shipyard_airflow/plugins/placeholder_operator.py diff --git a/shipyard_airflow/__init__.py b/shipyard_airflow/__init__.py index f10bbbf6..e69de29b 100644 --- a/shipyard_airflow/__init__.py +++ b/shipyard_airflow/__init__.py @@ -1,13 +0,0 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/shipyard_airflow/dags/__init__.py b/shipyard_airflow/dags/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/shipyard_airflow/dags/dag_concurrency_check.py b/shipyard_airflow/dags/dag_concurrency_check.py new file mode 100644 index 00000000..7a2558f2 --- /dev/null +++ b/shipyard_airflow/dags/dag_concurrency_check.py @@ -0,0 +1,53 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import airflow +from airflow.models import DAG +from airflow.operators import PlaceholderOperator +from airflow.operators.dummy_operator import DummyOperator + + +def dag_concurrency_check(parent_dag_name, child_dag_name, args): + ''' + dag_concurrency_check is a sub-DAG that will will allow for a DAG to + determine if it is already running, and result in an error if so. + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + # TODO () Replace this operator with a real operator that will: + # 1) Look for an instance of the parent_dag_name running currently in + # airflow + # 2) Fail if the parent_dag_name is running + # 3) Succeed if there are no instances of parent_dag_name running + dag_concurrency_check_operator = PlaceholderOperator( + task_id='dag_concurrency_check', dag=dag) + + return dag + + +def dag_concurrency_check_failure_handler(parent_dag_name, child_dag_name, + args): + ''' + Peforms the actions necessary when concurrency checks fail + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + operator = DummyOperator( + task_id='dag_concurrency_check_failure_handler', dag=dag, ) + + return dag diff --git a/shipyard_airflow/dags/deploy_site.py b/shipyard_airflow/dags/deploy_site.py new file mode 100644 index 00000000..4e748a30 --- /dev/null +++ b/shipyard_airflow/dags/deploy_site.py @@ -0,0 +1,143 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime, timedelta + +import airflow +from airflow import DAG +from dag_concurrency_check import dag_concurrency_check +from dag_concurrency_check import dag_concurrency_check_failure_handler +from preflight_checks import all_preflight_checks +from preflight_checks import preflight_failure_handler +from validate_site_design import validate_site_design +from validate_site_design import validate_site_design_failure_handler +from airflow.operators.subdag_operator import SubDagOperator +from airflow.operators import DeckhandOperator +from airflow.operators import PlaceholderOperator +from airflow.utils.trigger_rule import TriggerRule +from datetime import datetime, timedelta + +''' +deploy_site is the top-level orchestration DAG for deploying a site using the +Undercloud platform. +''' + +PARENT_DAG_NAME = 'deploy_site' +DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check' +CONCURRENCY_FAILURE_DAG_NAME = 'concurrency_check_failure_handler' +ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight' +PREFLIGHT_FAILURE_DAG_NAME = 'preflight_failure_handler' +DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version' +VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design' +VALIDATION_FAILED_DAG_NAME = 'validate_site_design_failure_handler' +DECKHAND_MARK_LAST_KNOWN_GOOD = 'deckhand_mark_last_known_good' + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': airflow.utils.dates.days_ago(1), + 'email': [''], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 0, + 'retry_delay': timedelta(minutes=1), +} + +dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None) + +concurrency_check = SubDagOperator( + subdag=dag_concurrency_check(PARENT_DAG_NAME, + DAG_CONCURRENCY_CHECK_DAG_NAME, + args=default_args), + task_id=DAG_CONCURRENCY_CHECK_DAG_NAME, + dag=dag, ) + +concurrency_check_failure_handler = SubDagOperator( + subdag=dag_concurrency_check_failure_handler( + PARENT_DAG_NAME, CONCURRENCY_FAILURE_DAG_NAME, + args=default_args), + task_id=CONCURRENCY_FAILURE_DAG_NAME, + trigger_rule=TriggerRule.ONE_FAILED, + dag=dag, ) + +preflight = SubDagOperator( + subdag=all_preflight_checks(PARENT_DAG_NAME, ALL_PREFLIGHT_CHECKS_DAG_NAME, + args=default_args), + task_id=ALL_PREFLIGHT_CHECKS_DAG_NAME, + dag=dag, ) + +preflight_failure = SubDagOperator( + subdag=preflight_failure_handler(PARENT_DAG_NAME, + PREFLIGHT_FAILURE_DAG_NAME, + args=default_args), + task_id=PREFLIGHT_FAILURE_DAG_NAME, + trigger_rule=TriggerRule.ONE_FAILED, + dag=dag, ) + +get_design_version = DeckhandOperator( + task_id=DECKHAND_GET_DESIGN_VERSION, dag=dag) + +validate_site_design = SubDagOperator( + subdag=validate_site_design(PARENT_DAG_NAME, VALIDATE_SITE_DESIGN_DAG_NAME, + args=default_args), + task_id=VALIDATE_SITE_DESIGN_DAG_NAME, + dag=dag) + +validate_site_design_failure = SubDagOperator( + subdag=validate_site_design_failure_handler( + dag.dag_id, VALIDATION_FAILED_DAG_NAME, + args=default_args), + task_id=VALIDATION_FAILED_DAG_NAME, + trigger_rule=TriggerRule.ONE_FAILED, + dag=dag) + +drydock_build = PlaceholderOperator(task_id='drydock_build', dag=dag) + +drydock_failure_handler = PlaceholderOperator( + task_id='drydock_failure_handler', + trigger_rule=TriggerRule.ONE_FAILED, + dag=dag) + +query_node_status = PlaceholderOperator( + task_id='deployed_node_status', dag=dag) + +nodes_not_healthy = PlaceholderOperator( + task_id='deployed_nodes_not_healthy', + trigger_rule=TriggerRule.ONE_FAILED, + dag=dag) + +armada_build = PlaceholderOperator(task_id='armada_build', dag=dag) + +armada_failure_handler = PlaceholderOperator( + task_id='armada_failure_handler', + trigger_rule=TriggerRule.ONE_FAILED, + dag=dag) + +mark_last_known_good = DeckhandOperator( + task_id=DECKHAND_MARK_LAST_KNOWN_GOOD, dag=dag) + +# DAG Wiring +concurrency_check_failure_handler.set_upstream(concurrency_check) +preflight.set_upstream(concurrency_check) +preflight_failure.set_upstream(preflight) +get_design_version.set_upstream(preflight) +validate_site_design.set_upstream(get_design_version) +validate_site_design_failure.set_upstream(validate_site_design) +drydock_build.set_upstream(validate_site_design) +drydock_failure_handler.set_upstream(drydock_build) +query_node_status.set_upstream(drydock_build) +nodes_not_healthy.set_upstream(query_node_status) +armada_build.set_upstream(query_node_status) +armada_failure_handler.set_upstream(armada_build) +mark_last_known_good.set_upstream(armada_build) diff --git a/shipyard_airflow/dags/preflight_checks.py b/shipyard_airflow/dags/preflight_checks.py new file mode 100644 index 00000000..dcd351c5 --- /dev/null +++ b/shipyard_airflow/dags/preflight_checks.py @@ -0,0 +1,166 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import airflow +from airflow.models import DAG +from airflow.operators.subdag_operator import SubDagOperator +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators import PlaceholderOperator + + +def k8s_preflight_check(parent_dag_name, child_dag_name, args): + ''' + The k8s_preflight_check checks that k8s is in a good state for + the purposes of the Undercloud Platform to proceed with processing + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + # TODO () Replace this operator with a real operator that will: + # 1) Ensure k8s is up and running. + # 2) Ensure that pods are not crashed + operator = PlaceholderOperator(task_id='k8s_preflight_check', dag=dag) + + return dag + + +def shipyard_preflight_check(parent_dag_name, child_dag_name, args): + ''' + Checks that shipyard is in a good state for + the purposes of the Undercloud Platform to proceed with processing + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + # TODO () Replace this operator with a real operator that will: + # 1) Ensure shipyard is up and running. + operator = PlaceholderOperator(task_id='shipyard_preflight_check', dag=dag) + + return dag + + +def deckhand_preflight_check(parent_dag_name, child_dag_name, args, ): + ''' + Checks that deckhand is in a good state for + the purposes of the Undercloud Platform to proceed with processing + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + # TODO () Replace this operator with a real operator that will: + # 1) Ensure deckhand is up and running. + operator = PlaceholderOperator(task_id='deckhand_preflight_check', dag=dag) + + return dag + + +def drydock_preflight_check(parent_dag_name, child_dag_name, args): + ''' + Checks that drydock is in a good state for + the purposes of the Undercloud Platform to proceed with processing + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + # TODO () Replace this operator with a real operator that will: + # 1) Ensure drydock is up and running. + operator = PlaceholderOperator(task_id='drydock_preflight_check', dag=dag) + + return dag + + +def armada_preflight_check(parent_dag_name, child_dag_name, args): + ''' + Checks that armada is in a good state for + the purposes of the Undercloud Platform to proceed with processing + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + # TODO () Replace this operator with a real operator that will: + # 1) Ensure armada is up and running. + operator = PlaceholderOperator(task_id='armada_preflight_check', dag=dag) + + return dag + + +# Names used for sub-subdags in the all preflight check subdag +K8S_PREFLIGHT_CHECK_DAG_NAME = 'k8s_preflight_check' +SHIPYARD_PREFLIGHT_CHECK_DAG_NAME = 'shipyard_preflight_check' +DECKHAND_PREFLIGHT_CHECK_DAG_NAME = 'deckhand_preflight_check' +DRYDOCK_PREFLIGHT_CHECK_DAG_NAME = 'drydock_preflight_check' +PROMENADE_PREFLIGHT_CHECK_DAG_NAME = 'promenade_preflight_check' +ARMADA_PREFLIGHT_CHECK_DAG_NAME = 'armada_preflight_check' + + +def all_preflight_checks(parent_dag_name, child_dag_name, args): + ''' + puts all of the preflight checks into an atomic unit. + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + k8s = SubDagOperator( + subdag=k8s_preflight_check(dag.dag_id, K8S_PREFLIGHT_CHECK_DAG_NAME, + args), + task_id=K8S_PREFLIGHT_CHECK_DAG_NAME, + dag=dag, ) + + shipyard = SubDagOperator( + subdag=shipyard_preflight_check(dag.dag_id, + SHIPYARD_PREFLIGHT_CHECK_DAG_NAME, + args), + task_id=SHIPYARD_PREFLIGHT_CHECK_DAG_NAME, + dag=dag, ) + + deckhand = SubDagOperator( + subdag=deckhand_preflight_check(dag.dag_id, + DECKHAND_PREFLIGHT_CHECK_DAG_NAME, + args), + task_id=DECKHAND_PREFLIGHT_CHECK_DAG_NAME, + dag=dag, ) + + drydock = SubDagOperator( + subdag=drydock_preflight_check(dag.dag_id, + DRYDOCK_PREFLIGHT_CHECK_DAG_NAME, + args), + task_id=DRYDOCK_PREFLIGHT_CHECK_DAG_NAME, + dag=dag, ) + + armada = SubDagOperator( + subdag=armada_preflight_check( + dag.dag_id, ARMADA_PREFLIGHT_CHECK_DAG_NAME, args), + task_id=ARMADA_PREFLIGHT_CHECK_DAG_NAME, + dag=dag, ) + + return dag + + +def preflight_failure_handler(parent_dag_name, child_dag_name, args): + ''' + Peforms the actions necessary when preflight checks fail + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + operator = DummyOperator(task_id='preflight_failure_handler', dag=dag) + + return dag diff --git a/shipyard_airflow/dags/validate_site_design.py b/shipyard_airflow/dags/validate_site_design.py new file mode 100644 index 00000000..0a117548 --- /dev/null +++ b/shipyard_airflow/dags/validate_site_design.py @@ -0,0 +1,58 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import airflow +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.subdag_operator import SubDagOperator +from airflow.utils.trigger_rule import TriggerRule +from airflow.operators import DeckhandOperator +from airflow.operators import PlaceholderOperator + + +def validate_site_design_failure_handler(parent_dag_name, child_dag_name, + args): + ''' + Peforms the actions necessary when any of the site design checks fail + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + operator = DummyOperator( + task_id='site_design_validation_failure_handler', dag=dag) + + return dag + + +def validate_site_design(parent_dag_name, child_dag_name, args): + ''' + Subdag to delegate design verification to the UCP components + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + deckhand_validate_docs = DeckhandOperator( + task_id='deckhand_validate_site_design', dag=dag) + + #TODO () use the real operator here + drydock_validate_docs = PlaceholderOperator( + task_id='drydock_validate_site_design', dag=dag) + + #TODO () use the real operator here + armada_validate_docs = PlaceholderOperator( + task_id='armada_validate_site_design', dag=dag) + + return dag diff --git a/shipyard_airflow/plugins/__init__.py b/shipyard_airflow/plugins/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/shipyard_airflow/plugins/deckhand_operator.py b/shipyard_airflow/plugins/deckhand_operator.py new file mode 100644 index 00000000..e4fdee18 --- /dev/null +++ b/shipyard_airflow/plugins/deckhand_operator.py @@ -0,0 +1,44 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.plugins_manager import AirflowPlugin + + +class DeckhandOperator(BaseOperator): + """ + Supports interaction with Deckhand. + """ + + #TODO () remove this special coloring when the operator is done. + ui_color = '#e8f7e4' + + @apply_defaults + def __init__(self, *args, **kwargs): + super(DeckhandOperator, self).__init__(*args, **kwargs) + + # TODO () make this communicate with Deckhand. + # Needs to expose functionality so general interaction + # with deckhand can occur. + def execute(self, context): + logging.info('%s : %s !!! not implemented. ' + 'Need to get design revision from Deckhand', + self.dag.dag_id, self.task_id) + + +class DeckhandOperatorPlugin(AirflowPlugin): + name = 'deckhand_operator_plugin' + operators = [DeckhandOperator] diff --git a/shipyard_airflow/plugins/placeholder_operator.py b/shipyard_airflow/plugins/placeholder_operator.py new file mode 100644 index 00000000..504bc043 --- /dev/null +++ b/shipyard_airflow/plugins/placeholder_operator.py @@ -0,0 +1,42 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.plugins_manager import AirflowPlugin + + +class PlaceholderOperator(BaseOperator): + """ + Operator that writes a log of its presence, as not implemented. + This is intended to be a little noisy so it's easy to see what's + missing. + """ + + template_fields = tuple() + ui_color = '#e8f7e4' + + @apply_defaults + def __init__(self, *args, **kwargs): + super(PlaceholderOperator, self).__init__(*args, **kwargs) + + def execute(self, context): + logging.info('%s : %s !!! not implemented', self.dag.dag_id, + self.task_id) + + +class PlaceholderOperatorPlugin(AirflowPlugin): + name = "placeholder_operator_plugin" + operators = [PlaceholderOperator]