diff --git a/shipyard_airflow/__init__.py b/shipyard_airflow/__init__.py index f10bbbf6..e69de29b 100644 --- a/shipyard_airflow/__init__.py +++ b/shipyard_airflow/__init__.py @@ -1,13 +0,0 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/shipyard_airflow/dags/__init__.py b/shipyard_airflow/dags/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/shipyard_airflow/dags/dag_concurrency_check.py b/shipyard_airflow/dags/dag_concurrency_check.py new file mode 100644 index 00000000..7a2558f2 --- /dev/null +++ b/shipyard_airflow/dags/dag_concurrency_check.py @@ -0,0 +1,53 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import airflow +from airflow.models import DAG +from airflow.operators import PlaceholderOperator +from airflow.operators.dummy_operator import DummyOperator + + +def dag_concurrency_check(parent_dag_name, child_dag_name, args): + ''' + dag_concurrency_check is a sub-DAG that will will allow for a DAG to + determine if it is already running, and result in an error if so. + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + # TODO () Replace this operator with a real operator that will: + # 1) Look for an instance of the parent_dag_name running currently in + # airflow + # 2) Fail if the parent_dag_name is running + # 3) Succeed if there are no instances of parent_dag_name running + dag_concurrency_check_operator = PlaceholderOperator( + task_id='dag_concurrency_check', dag=dag) + + return dag + + +def dag_concurrency_check_failure_handler(parent_dag_name, child_dag_name, + args): + ''' + Peforms the actions necessary when concurrency checks fail + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + operator = DummyOperator( + task_id='dag_concurrency_check_failure_handler', dag=dag, ) + + return dag diff --git a/shipyard_airflow/dags/deploy_site.py b/shipyard_airflow/dags/deploy_site.py new file mode 100644 index 00000000..4e748a30 --- /dev/null +++ b/shipyard_airflow/dags/deploy_site.py @@ -0,0 +1,143 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime, timedelta + +import airflow +from airflow import DAG +from dag_concurrency_check import dag_concurrency_check +from dag_concurrency_check import dag_concurrency_check_failure_handler +from preflight_checks import all_preflight_checks +from preflight_checks import preflight_failure_handler +from validate_site_design import validate_site_design +from validate_site_design import validate_site_design_failure_handler +from airflow.operators.subdag_operator import SubDagOperator +from airflow.operators import DeckhandOperator +from airflow.operators import PlaceholderOperator +from airflow.utils.trigger_rule import TriggerRule +from datetime import datetime, timedelta + +''' +deploy_site is the top-level orchestration DAG for deploying a site using the +Undercloud platform. +''' + +PARENT_DAG_NAME = 'deploy_site' +DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check' +CONCURRENCY_FAILURE_DAG_NAME = 'concurrency_check_failure_handler' +ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight' +PREFLIGHT_FAILURE_DAG_NAME = 'preflight_failure_handler' +DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version' +VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design' +VALIDATION_FAILED_DAG_NAME = 'validate_site_design_failure_handler' +DECKHAND_MARK_LAST_KNOWN_GOOD = 'deckhand_mark_last_known_good' + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': airflow.utils.dates.days_ago(1), + 'email': [''], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 0, + 'retry_delay': timedelta(minutes=1), +} + +dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None) + +concurrency_check = SubDagOperator( + subdag=dag_concurrency_check(PARENT_DAG_NAME, + DAG_CONCURRENCY_CHECK_DAG_NAME, + args=default_args), + task_id=DAG_CONCURRENCY_CHECK_DAG_NAME, + dag=dag, ) + +concurrency_check_failure_handler = SubDagOperator( + subdag=dag_concurrency_check_failure_handler( + PARENT_DAG_NAME, CONCURRENCY_FAILURE_DAG_NAME, + args=default_args), + task_id=CONCURRENCY_FAILURE_DAG_NAME, + trigger_rule=TriggerRule.ONE_FAILED, + dag=dag, ) + +preflight = SubDagOperator( + subdag=all_preflight_checks(PARENT_DAG_NAME, ALL_PREFLIGHT_CHECKS_DAG_NAME, + args=default_args), + task_id=ALL_PREFLIGHT_CHECKS_DAG_NAME, + dag=dag, ) + +preflight_failure = SubDagOperator( + subdag=preflight_failure_handler(PARENT_DAG_NAME, + PREFLIGHT_FAILURE_DAG_NAME, + args=default_args), + task_id=PREFLIGHT_FAILURE_DAG_NAME, + trigger_rule=TriggerRule.ONE_FAILED, + dag=dag, ) + +get_design_version = DeckhandOperator( + task_id=DECKHAND_GET_DESIGN_VERSION, dag=dag) + +validate_site_design = SubDagOperator( + subdag=validate_site_design(PARENT_DAG_NAME, VALIDATE_SITE_DESIGN_DAG_NAME, + args=default_args), + task_id=VALIDATE_SITE_DESIGN_DAG_NAME, + dag=dag) + +validate_site_design_failure = SubDagOperator( + subdag=validate_site_design_failure_handler( + dag.dag_id, VALIDATION_FAILED_DAG_NAME, + args=default_args), + task_id=VALIDATION_FAILED_DAG_NAME, + trigger_rule=TriggerRule.ONE_FAILED, + dag=dag) + +drydock_build = PlaceholderOperator(task_id='drydock_build', dag=dag) + +drydock_failure_handler = PlaceholderOperator( + task_id='drydock_failure_handler', + trigger_rule=TriggerRule.ONE_FAILED, + dag=dag) + +query_node_status = PlaceholderOperator( + task_id='deployed_node_status', dag=dag) + +nodes_not_healthy = PlaceholderOperator( + task_id='deployed_nodes_not_healthy', + trigger_rule=TriggerRule.ONE_FAILED, + dag=dag) + +armada_build = PlaceholderOperator(task_id='armada_build', dag=dag) + +armada_failure_handler = PlaceholderOperator( + task_id='armada_failure_handler', + trigger_rule=TriggerRule.ONE_FAILED, + dag=dag) + +mark_last_known_good = DeckhandOperator( + task_id=DECKHAND_MARK_LAST_KNOWN_GOOD, dag=dag) + +# DAG Wiring +concurrency_check_failure_handler.set_upstream(concurrency_check) +preflight.set_upstream(concurrency_check) +preflight_failure.set_upstream(preflight) +get_design_version.set_upstream(preflight) +validate_site_design.set_upstream(get_design_version) +validate_site_design_failure.set_upstream(validate_site_design) +drydock_build.set_upstream(validate_site_design) +drydock_failure_handler.set_upstream(drydock_build) +query_node_status.set_upstream(drydock_build) +nodes_not_healthy.set_upstream(query_node_status) +armada_build.set_upstream(query_node_status) +armada_failure_handler.set_upstream(armada_build) +mark_last_known_good.set_upstream(armada_build) diff --git a/shipyard_airflow/dags/preflight_checks.py b/shipyard_airflow/dags/preflight_checks.py new file mode 100644 index 00000000..dcd351c5 --- /dev/null +++ b/shipyard_airflow/dags/preflight_checks.py @@ -0,0 +1,166 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import airflow +from airflow.models import DAG +from airflow.operators.subdag_operator import SubDagOperator +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators import PlaceholderOperator + + +def k8s_preflight_check(parent_dag_name, child_dag_name, args): + ''' + The k8s_preflight_check checks that k8s is in a good state for + the purposes of the Undercloud Platform to proceed with processing + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + # TODO () Replace this operator with a real operator that will: + # 1) Ensure k8s is up and running. + # 2) Ensure that pods are not crashed + operator = PlaceholderOperator(task_id='k8s_preflight_check', dag=dag) + + return dag + + +def shipyard_preflight_check(parent_dag_name, child_dag_name, args): + ''' + Checks that shipyard is in a good state for + the purposes of the Undercloud Platform to proceed with processing + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + # TODO () Replace this operator with a real operator that will: + # 1) Ensure shipyard is up and running. + operator = PlaceholderOperator(task_id='shipyard_preflight_check', dag=dag) + + return dag + + +def deckhand_preflight_check(parent_dag_name, child_dag_name, args, ): + ''' + Checks that deckhand is in a good state for + the purposes of the Undercloud Platform to proceed with processing + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + # TODO () Replace this operator with a real operator that will: + # 1) Ensure deckhand is up and running. + operator = PlaceholderOperator(task_id='deckhand_preflight_check', dag=dag) + + return dag + + +def drydock_preflight_check(parent_dag_name, child_dag_name, args): + ''' + Checks that drydock is in a good state for + the purposes of the Undercloud Platform to proceed with processing + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + # TODO () Replace this operator with a real operator that will: + # 1) Ensure drydock is up and running. + operator = PlaceholderOperator(task_id='drydock_preflight_check', dag=dag) + + return dag + + +def armada_preflight_check(parent_dag_name, child_dag_name, args): + ''' + Checks that armada is in a good state for + the purposes of the Undercloud Platform to proceed with processing + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + # TODO () Replace this operator with a real operator that will: + # 1) Ensure armada is up and running. + operator = PlaceholderOperator(task_id='armada_preflight_check', dag=dag) + + return dag + + +# Names used for sub-subdags in the all preflight check subdag +K8S_PREFLIGHT_CHECK_DAG_NAME = 'k8s_preflight_check' +SHIPYARD_PREFLIGHT_CHECK_DAG_NAME = 'shipyard_preflight_check' +DECKHAND_PREFLIGHT_CHECK_DAG_NAME = 'deckhand_preflight_check' +DRYDOCK_PREFLIGHT_CHECK_DAG_NAME = 'drydock_preflight_check' +PROMENADE_PREFLIGHT_CHECK_DAG_NAME = 'promenade_preflight_check' +ARMADA_PREFLIGHT_CHECK_DAG_NAME = 'armada_preflight_check' + + +def all_preflight_checks(parent_dag_name, child_dag_name, args): + ''' + puts all of the preflight checks into an atomic unit. + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + k8s = SubDagOperator( + subdag=k8s_preflight_check(dag.dag_id, K8S_PREFLIGHT_CHECK_DAG_NAME, + args), + task_id=K8S_PREFLIGHT_CHECK_DAG_NAME, + dag=dag, ) + + shipyard = SubDagOperator( + subdag=shipyard_preflight_check(dag.dag_id, + SHIPYARD_PREFLIGHT_CHECK_DAG_NAME, + args), + task_id=SHIPYARD_PREFLIGHT_CHECK_DAG_NAME, + dag=dag, ) + + deckhand = SubDagOperator( + subdag=deckhand_preflight_check(dag.dag_id, + DECKHAND_PREFLIGHT_CHECK_DAG_NAME, + args), + task_id=DECKHAND_PREFLIGHT_CHECK_DAG_NAME, + dag=dag, ) + + drydock = SubDagOperator( + subdag=drydock_preflight_check(dag.dag_id, + DRYDOCK_PREFLIGHT_CHECK_DAG_NAME, + args), + task_id=DRYDOCK_PREFLIGHT_CHECK_DAG_NAME, + dag=dag, ) + + armada = SubDagOperator( + subdag=armada_preflight_check( + dag.dag_id, ARMADA_PREFLIGHT_CHECK_DAG_NAME, args), + task_id=ARMADA_PREFLIGHT_CHECK_DAG_NAME, + dag=dag, ) + + return dag + + +def preflight_failure_handler(parent_dag_name, child_dag_name, args): + ''' + Peforms the actions necessary when preflight checks fail + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + operator = DummyOperator(task_id='preflight_failure_handler', dag=dag) + + return dag diff --git a/shipyard_airflow/dags/validate_site_design.py b/shipyard_airflow/dags/validate_site_design.py new file mode 100644 index 00000000..0a117548 --- /dev/null +++ b/shipyard_airflow/dags/validate_site_design.py @@ -0,0 +1,58 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import airflow +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.subdag_operator import SubDagOperator +from airflow.utils.trigger_rule import TriggerRule +from airflow.operators import DeckhandOperator +from airflow.operators import PlaceholderOperator + + +def validate_site_design_failure_handler(parent_dag_name, child_dag_name, + args): + ''' + Peforms the actions necessary when any of the site design checks fail + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + operator = DummyOperator( + task_id='site_design_validation_failure_handler', dag=dag) + + return dag + + +def validate_site_design(parent_dag_name, child_dag_name, args): + ''' + Subdag to delegate design verification to the UCP components + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + deckhand_validate_docs = DeckhandOperator( + task_id='deckhand_validate_site_design', dag=dag) + + #TODO () use the real operator here + drydock_validate_docs = PlaceholderOperator( + task_id='drydock_validate_site_design', dag=dag) + + #TODO () use the real operator here + armada_validate_docs = PlaceholderOperator( + task_id='armada_validate_site_design', dag=dag) + + return dag diff --git a/shipyard_airflow/plugins/__init__.py b/shipyard_airflow/plugins/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/shipyard_airflow/plugins/deckhand_operator.py b/shipyard_airflow/plugins/deckhand_operator.py new file mode 100644 index 00000000..e4fdee18 --- /dev/null +++ b/shipyard_airflow/plugins/deckhand_operator.py @@ -0,0 +1,44 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.plugins_manager import AirflowPlugin + + +class DeckhandOperator(BaseOperator): + """ + Supports interaction with Deckhand. + """ + + #TODO () remove this special coloring when the operator is done. + ui_color = '#e8f7e4' + + @apply_defaults + def __init__(self, *args, **kwargs): + super(DeckhandOperator, self).__init__(*args, **kwargs) + + # TODO () make this communicate with Deckhand. + # Needs to expose functionality so general interaction + # with deckhand can occur. + def execute(self, context): + logging.info('%s : %s !!! not implemented. ' + 'Need to get design revision from Deckhand', + self.dag.dag_id, self.task_id) + + +class DeckhandOperatorPlugin(AirflowPlugin): + name = 'deckhand_operator_plugin' + operators = [DeckhandOperator] diff --git a/shipyard_airflow/plugins/placeholder_operator.py b/shipyard_airflow/plugins/placeholder_operator.py new file mode 100644 index 00000000..504bc043 --- /dev/null +++ b/shipyard_airflow/plugins/placeholder_operator.py @@ -0,0 +1,42 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.plugins_manager import AirflowPlugin + + +class PlaceholderOperator(BaseOperator): + """ + Operator that writes a log of its presence, as not implemented. + This is intended to be a little noisy so it's easy to see what's + missing. + """ + + template_fields = tuple() + ui_color = '#e8f7e4' + + @apply_defaults + def __init__(self, *args, **kwargs): + super(PlaceholderOperator, self).__init__(*args, **kwargs) + + def execute(self, context): + logging.info('%s : %s !!! not implemented', self.dag.dag_id, + self.task_id) + + +class PlaceholderOperatorPlugin(AirflowPlugin): + name = "placeholder_operator_plugin" + operators = [PlaceholderOperator]