Merge "Add deploy site DAG skeleton"

This commit is contained in:
Felipe Monteiro 2017-08-16 15:08:01 -04:00 committed by Gerrit Code Review
commit a889877524
9 changed files with 506 additions and 13 deletions

View File

@ -1,13 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

View File

@ -0,0 +1,53 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import airflow
from airflow.models import DAG
from airflow.operators import PlaceholderOperator
from airflow.operators.dummy_operator import DummyOperator
def dag_concurrency_check(parent_dag_name, child_dag_name, args):
'''
dag_concurrency_check is a sub-DAG that will will allow for a DAG to
determine if it is already running, and result in an error if so.
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
# TODO () Replace this operator with a real operator that will:
# 1) Look for an instance of the parent_dag_name running currently in
# airflow
# 2) Fail if the parent_dag_name is running
# 3) Succeed if there are no instances of parent_dag_name running
dag_concurrency_check_operator = PlaceholderOperator(
task_id='dag_concurrency_check', dag=dag)
return dag
def dag_concurrency_check_failure_handler(parent_dag_name, child_dag_name,
args):
'''
Peforms the actions necessary when concurrency checks fail
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DummyOperator(
task_id='dag_concurrency_check_failure_handler', dag=dag, )
return dag

View File

@ -0,0 +1,143 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime, timedelta
import airflow
from airflow import DAG
from dag_concurrency_check import dag_concurrency_check
from dag_concurrency_check import dag_concurrency_check_failure_handler
from preflight_checks import all_preflight_checks
from preflight_checks import preflight_failure_handler
from validate_site_design import validate_site_design
from validate_site_design import validate_site_design_failure_handler
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators import DeckhandOperator
from airflow.operators import PlaceholderOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta
'''
deploy_site is the top-level orchestration DAG for deploying a site using the
Undercloud platform.
'''
PARENT_DAG_NAME = 'deploy_site'
DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check'
CONCURRENCY_FAILURE_DAG_NAME = 'concurrency_check_failure_handler'
ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight'
PREFLIGHT_FAILURE_DAG_NAME = 'preflight_failure_handler'
DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version'
VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design'
VALIDATION_FAILED_DAG_NAME = 'validate_site_design_failure_handler'
DECKHAND_MARK_LAST_KNOWN_GOOD = 'deckhand_mark_last_known_good'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(1),
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
}
dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None)
concurrency_check = SubDagOperator(
subdag=dag_concurrency_check(PARENT_DAG_NAME,
DAG_CONCURRENCY_CHECK_DAG_NAME,
args=default_args),
task_id=DAG_CONCURRENCY_CHECK_DAG_NAME,
dag=dag, )
concurrency_check_failure_handler = SubDagOperator(
subdag=dag_concurrency_check_failure_handler(
PARENT_DAG_NAME, CONCURRENCY_FAILURE_DAG_NAME,
args=default_args),
task_id=CONCURRENCY_FAILURE_DAG_NAME,
trigger_rule=TriggerRule.ONE_FAILED,
dag=dag, )
preflight = SubDagOperator(
subdag=all_preflight_checks(PARENT_DAG_NAME, ALL_PREFLIGHT_CHECKS_DAG_NAME,
args=default_args),
task_id=ALL_PREFLIGHT_CHECKS_DAG_NAME,
dag=dag, )
preflight_failure = SubDagOperator(
subdag=preflight_failure_handler(PARENT_DAG_NAME,
PREFLIGHT_FAILURE_DAG_NAME,
args=default_args),
task_id=PREFLIGHT_FAILURE_DAG_NAME,
trigger_rule=TriggerRule.ONE_FAILED,
dag=dag, )
get_design_version = DeckhandOperator(
task_id=DECKHAND_GET_DESIGN_VERSION, dag=dag)
validate_site_design = SubDagOperator(
subdag=validate_site_design(PARENT_DAG_NAME, VALIDATE_SITE_DESIGN_DAG_NAME,
args=default_args),
task_id=VALIDATE_SITE_DESIGN_DAG_NAME,
dag=dag)
validate_site_design_failure = SubDagOperator(
subdag=validate_site_design_failure_handler(
dag.dag_id, VALIDATION_FAILED_DAG_NAME,
args=default_args),
task_id=VALIDATION_FAILED_DAG_NAME,
trigger_rule=TriggerRule.ONE_FAILED,
dag=dag)
drydock_build = PlaceholderOperator(task_id='drydock_build', dag=dag)
drydock_failure_handler = PlaceholderOperator(
task_id='drydock_failure_handler',
trigger_rule=TriggerRule.ONE_FAILED,
dag=dag)
query_node_status = PlaceholderOperator(
task_id='deployed_node_status', dag=dag)
nodes_not_healthy = PlaceholderOperator(
task_id='deployed_nodes_not_healthy',
trigger_rule=TriggerRule.ONE_FAILED,
dag=dag)
armada_build = PlaceholderOperator(task_id='armada_build', dag=dag)
armada_failure_handler = PlaceholderOperator(
task_id='armada_failure_handler',
trigger_rule=TriggerRule.ONE_FAILED,
dag=dag)
mark_last_known_good = DeckhandOperator(
task_id=DECKHAND_MARK_LAST_KNOWN_GOOD, dag=dag)
# DAG Wiring
concurrency_check_failure_handler.set_upstream(concurrency_check)
preflight.set_upstream(concurrency_check)
preflight_failure.set_upstream(preflight)
get_design_version.set_upstream(preflight)
validate_site_design.set_upstream(get_design_version)
validate_site_design_failure.set_upstream(validate_site_design)
drydock_build.set_upstream(validate_site_design)
drydock_failure_handler.set_upstream(drydock_build)
query_node_status.set_upstream(drydock_build)
nodes_not_healthy.set_upstream(query_node_status)
armada_build.set_upstream(query_node_status)
armada_failure_handler.set_upstream(armada_build)
mark_last_known_good.set_upstream(armada_build)

View File

@ -0,0 +1,166 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import airflow
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import PlaceholderOperator
def k8s_preflight_check(parent_dag_name, child_dag_name, args):
'''
The k8s_preflight_check checks that k8s is in a good state for
the purposes of the Undercloud Platform to proceed with processing
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
# TODO () Replace this operator with a real operator that will:
# 1) Ensure k8s is up and running.
# 2) Ensure that pods are not crashed
operator = PlaceholderOperator(task_id='k8s_preflight_check', dag=dag)
return dag
def shipyard_preflight_check(parent_dag_name, child_dag_name, args):
'''
Checks that shipyard is in a good state for
the purposes of the Undercloud Platform to proceed with processing
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
# TODO () Replace this operator with a real operator that will:
# 1) Ensure shipyard is up and running.
operator = PlaceholderOperator(task_id='shipyard_preflight_check', dag=dag)
return dag
def deckhand_preflight_check(parent_dag_name, child_dag_name, args, ):
'''
Checks that deckhand is in a good state for
the purposes of the Undercloud Platform to proceed with processing
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
# TODO () Replace this operator with a real operator that will:
# 1) Ensure deckhand is up and running.
operator = PlaceholderOperator(task_id='deckhand_preflight_check', dag=dag)
return dag
def drydock_preflight_check(parent_dag_name, child_dag_name, args):
'''
Checks that drydock is in a good state for
the purposes of the Undercloud Platform to proceed with processing
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
# TODO () Replace this operator with a real operator that will:
# 1) Ensure drydock is up and running.
operator = PlaceholderOperator(task_id='drydock_preflight_check', dag=dag)
return dag
def armada_preflight_check(parent_dag_name, child_dag_name, args):
'''
Checks that armada is in a good state for
the purposes of the Undercloud Platform to proceed with processing
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
# TODO () Replace this operator with a real operator that will:
# 1) Ensure armada is up and running.
operator = PlaceholderOperator(task_id='armada_preflight_check', dag=dag)
return dag
# Names used for sub-subdags in the all preflight check subdag
K8S_PREFLIGHT_CHECK_DAG_NAME = 'k8s_preflight_check'
SHIPYARD_PREFLIGHT_CHECK_DAG_NAME = 'shipyard_preflight_check'
DECKHAND_PREFLIGHT_CHECK_DAG_NAME = 'deckhand_preflight_check'
DRYDOCK_PREFLIGHT_CHECK_DAG_NAME = 'drydock_preflight_check'
PROMENADE_PREFLIGHT_CHECK_DAG_NAME = 'promenade_preflight_check'
ARMADA_PREFLIGHT_CHECK_DAG_NAME = 'armada_preflight_check'
def all_preflight_checks(parent_dag_name, child_dag_name, args):
'''
puts all of the preflight checks into an atomic unit.
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
k8s = SubDagOperator(
subdag=k8s_preflight_check(dag.dag_id, K8S_PREFLIGHT_CHECK_DAG_NAME,
args),
task_id=K8S_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, )
shipyard = SubDagOperator(
subdag=shipyard_preflight_check(dag.dag_id,
SHIPYARD_PREFLIGHT_CHECK_DAG_NAME,
args),
task_id=SHIPYARD_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, )
deckhand = SubDagOperator(
subdag=deckhand_preflight_check(dag.dag_id,
DECKHAND_PREFLIGHT_CHECK_DAG_NAME,
args),
task_id=DECKHAND_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, )
drydock = SubDagOperator(
subdag=drydock_preflight_check(dag.dag_id,
DRYDOCK_PREFLIGHT_CHECK_DAG_NAME,
args),
task_id=DRYDOCK_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, )
armada = SubDagOperator(
subdag=armada_preflight_check(
dag.dag_id, ARMADA_PREFLIGHT_CHECK_DAG_NAME, args),
task_id=ARMADA_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, )
return dag
def preflight_failure_handler(parent_dag_name, child_dag_name, args):
'''
Peforms the actions necessary when preflight checks fail
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DummyOperator(task_id='preflight_failure_handler', dag=dag)
return dag

View File

@ -0,0 +1,58 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators import DeckhandOperator
from airflow.operators import PlaceholderOperator
def validate_site_design_failure_handler(parent_dag_name, child_dag_name,
args):
'''
Peforms the actions necessary when any of the site design checks fail
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DummyOperator(
task_id='site_design_validation_failure_handler', dag=dag)
return dag
def validate_site_design(parent_dag_name, child_dag_name, args):
'''
Subdag to delegate design verification to the UCP components
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
deckhand_validate_docs = DeckhandOperator(
task_id='deckhand_validate_site_design', dag=dag)
#TODO () use the real operator here
drydock_validate_docs = PlaceholderOperator(
task_id='drydock_validate_site_design', dag=dag)
#TODO () use the real operator here
armada_validate_docs = PlaceholderOperator(
task_id='armada_validate_site_design', dag=dag)
return dag

View File

View File

@ -0,0 +1,44 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.plugins_manager import AirflowPlugin
class DeckhandOperator(BaseOperator):
"""
Supports interaction with Deckhand.
"""
#TODO () remove this special coloring when the operator is done.
ui_color = '#e8f7e4'
@apply_defaults
def __init__(self, *args, **kwargs):
super(DeckhandOperator, self).__init__(*args, **kwargs)
# TODO () make this communicate with Deckhand.
# Needs to expose functionality so general interaction
# with deckhand can occur.
def execute(self, context):
logging.info('%s : %s !!! not implemented. '
'Need to get design revision from Deckhand',
self.dag.dag_id, self.task_id)
class DeckhandOperatorPlugin(AirflowPlugin):
name = 'deckhand_operator_plugin'
operators = [DeckhandOperator]

View File

@ -0,0 +1,42 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.plugins_manager import AirflowPlugin
class PlaceholderOperator(BaseOperator):
"""
Operator that writes a log of its presence, as not implemented.
This is intended to be a little noisy so it's easy to see what's
missing.
"""
template_fields = tuple()
ui_color = '#e8f7e4'
@apply_defaults
def __init__(self, *args, **kwargs):
super(PlaceholderOperator, self).__init__(*args, **kwargs)
def execute(self, context):
logging.info('%s : %s !!! not implemented', self.dag.dag_id,
self.task_id)
class PlaceholderOperatorPlugin(AirflowPlugin):
name = "placeholder_operator_plugin"
operators = [PlaceholderOperator]