Collapse Extra Subdag Layer Across Dags

We will collapse the extra subdag layer which is not really
needed for functionalities. This will make the dags leaner and
easier to control.

Corresponding changes have been made to the Operators as well

Change-Id: I84135a15357abf902d93f3cb085c7c9a21c38b7d
This commit is contained in:
Anthony Lin 2017-12-14 07:24:18 +00:00
parent 22315bb357
commit 4de8c00830
8 changed files with 113 additions and 265 deletions

View File

@ -16,6 +16,8 @@ from airflow.models import DAG
from airflow.operators import ArmadaOperator
# Location of shiyard.conf
# Note that the shipyard.conf file needs to be placed on a volume
# that can be accessed by the containers
config_path = '/usr/local/airflow/plugins/shipyard.conf'
@ -32,6 +34,8 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args):
task_id='create_armada_client',
shipyard_conf=config_path,
action='create_armada_client',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# Get Tiller Status
@ -39,6 +43,8 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args):
task_id='armada_status',
shipyard_conf=config_path,
action='armada_status',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# Validate Armada YAMLs
@ -46,6 +52,8 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args):
task_id='armada_validate',
shipyard_conf=config_path,
action='armada_validate',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# Armada Apply
@ -53,6 +61,8 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args):
task_id='armada_apply',
shipyard_conf=config_path,
action='armada_apply',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
retries=10,
dag=dag)
@ -61,6 +71,8 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args):
task_id='armada_get_releases',
shipyard_conf=config_path,
action='armada_get_releases',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# Define dependencies

View File

@ -14,14 +14,15 @@
from airflow.models import DAG
from airflow.operators import DeckhandOperator
from airflow.operators.subdag_operator import SubDagOperator
# Location of shiyard.conf
# Note that the shipyard.conf file needs to be placed on a volume
# that can be accessed by the containers
config_path = '/usr/local/airflow/plugins/shipyard.conf'
def get_design_version(parent_dag_name, child_dag_name, args):
def get_design_deckhand(parent_dag_name, child_dag_name, args):
'''
Get Deckhand Design Version
'''
@ -29,39 +30,12 @@ def get_design_version(parent_dag_name, child_dag_name, args):
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args)
# Note that in the event where the 'deploy_site' Action is
# triggered from Shipyard, the 'parent_dag_name' variable
# gets assigned with 'deploy_site.deckhand_get_design_version'.
# This is the name that we want to assign to the subdag so
# that we can reference it for xcom. The name of the main
# dag will be the front part of that value, i.e. 'deploy_site'.
# Hence we will extract the front part and assign it to main_dag.
# We will reuse this pattern for other Actions, e.g. update_site,
# redeploy_site as well.
operator = DeckhandOperator(
task_id=child_dag_name,
deckhand_design = DeckhandOperator(
task_id='deckhand_get_design_version',
shipyard_conf=config_path,
action=child_dag_name,
main_dag_name=parent_dag_name[0:parent_dag_name.find('.')],
sub_dag_name=parent_dag_name,
dag=dag)
return dag
def get_design_deckhand(parent_dag_name, child_dag_name, args):
'''
Puts into atomic unit
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args)
deckhand_design = SubDagOperator(
subdag=get_design_version(dag.dag_id,
child_dag_name,
args),
task_id=child_dag_name,
action='deckhand_get_design_version',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
return dag

View File

@ -13,92 +13,64 @@
# limitations under the License.
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators import DryDockOperator
# Location of shiyard.conf
# Note that the shipyard.conf file needs to be placed on a volume
# that can be accessed by the containers
config_path = '/usr/local/airflow/plugins/shipyard.conf'
# Names used for sub-subdags in the drydock site deployment subdag
CREATE_DRYDOCK_CLIENT_DAG_NAME = 'create_drydock_client'
DRYDOCK_VERIFY_SITE_DAG_NAME = 'verify_site'
DRYDOCK_PREPARE_SITE_DAG_NAME = 'prepare_site'
DRYDOCK_PREPARE_NODE_DAG_NAME = 'prepare_nodes'
DRYDOCK_DEPLOY_NODE_DAG_NAME = 'deploy_nodes'
def get_drydock_subdag_step(parent_dag_name, child_dag_name, args):
'''
Execute DryDock Subdag
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args)
# Note that in the event where the 'deploy_site' action is
# triggered from Shipyard, the 'parent_dag_name' variable
# gets assigned with 'deploy_site.create_drydock_client'.
# This is the name that we want to assign to the subdag so
# that we can reference it for xcom. The name of the main
# dag will be the front part of that value, i.e. 'deploy_site'.
# Hence we will extract the front part and assign it to main_dag.
# We will reuse this pattern for other Actions, e.g. update_site,
# redeploy_site as well.
operator = DryDockOperator(
task_id=child_dag_name,
shipyard_conf=config_path,
action=child_dag_name,
main_dag_name=parent_dag_name[0:parent_dag_name.find('.')],
sub_dag_name=parent_dag_name,
dag=dag)
return dag
def deploy_site_drydock(parent_dag_name, child_dag_name, args):
'''
Puts all of the drydock deploy site into atomic unit
DryDock Subdag
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args)
drydock_client = SubDagOperator(
subdag=get_drydock_subdag_step(dag.dag_id,
CREATE_DRYDOCK_CLIENT_DAG_NAME,
args),
task_id=CREATE_DRYDOCK_CLIENT_DAG_NAME,
drydock_client = DryDockOperator(
task_id='create_drydock_client',
shipyard_conf=config_path,
action='create_drydock_client',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
drydock_verify_site = SubDagOperator(
subdag=get_drydock_subdag_step(dag.dag_id,
DRYDOCK_VERIFY_SITE_DAG_NAME,
args),
task_id=DRYDOCK_VERIFY_SITE_DAG_NAME,
drydock_verify_site = DryDockOperator(
task_id='verify_site',
shipyard_conf=config_path,
action='verify_site',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
drydock_prepare_site = SubDagOperator(
subdag=get_drydock_subdag_step(dag.dag_id,
DRYDOCK_PREPARE_SITE_DAG_NAME,
args),
task_id=DRYDOCK_PREPARE_SITE_DAG_NAME,
drydock_prepare_site = DryDockOperator(
task_id='prepare_site',
shipyard_conf=config_path,
action='prepare_site',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
drydock_prepare_nodes = SubDagOperator(
subdag=get_drydock_subdag_step(dag.dag_id,
DRYDOCK_PREPARE_NODE_DAG_NAME,
args),
task_id=DRYDOCK_PREPARE_NODE_DAG_NAME,
drydock_prepare_nodes = DryDockOperator(
task_id='prepare_nodes',
shipyard_conf=config_path,
action='prepare_nodes',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
drydock_deploy_nodes = SubDagOperator(
subdag=get_drydock_subdag_step(dag.dag_id,
DRYDOCK_DEPLOY_NODE_DAG_NAME,
args),
task_id=DRYDOCK_DEPLOY_NODE_DAG_NAME,
drydock_deploy_nodes = DryDockOperator(
task_id='deploy_nodes',
shipyard_conf=config_path,
action='deploy_nodes',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# DAG Wiring
# Define dependencies
drydock_verify_site.set_upstream(drydock_client)
drydock_prepare_site.set_upstream(drydock_verify_site)
drydock_prepare_nodes.set_upstream(drydock_prepare_site)

View File

@ -13,159 +13,72 @@
# limitations under the License.
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators import K8sHealthCheckOperator
from airflow.operators import UcpHealthCheckOperator
# Location of shiyard.conf
config_path = '/usr/local/airflow/plugins/shipyard.conf'
# Note that the shipyard.conf file needs to be placed on a volume
# that can be accessed by the containers
config_path = '/usr/local/airflow/plugins/shipyard.conf'
def k8s_preflight_check(parent_dag_name, child_dag_name, args):
# TODO: Add Checks for Promenade when the API is ready
def all_preflight_checks(parent_dag_name, child_dag_name, args):
'''
The k8s_preflight_check checks that k8s is in a good state for
the purposes of the Undercloud Platform to proceed with processing
Pre-Flight Checks Subdag
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
default_args=args)
# Ensure k8s is up and running.
# Ensure that pods are not crashed
operator = K8sHealthCheckOperator(
'''
The k8s_preflight_check checks that k8s is in a good state
for the purposes of the Undercloud Platform to proceed with
processing
'''
k8s = K8sHealthCheckOperator(
task_id='k8s_preflight_check',
dag=dag)
return dag
def shipyard_preflight_check(parent_dag_name, child_dag_name, args):
'''
Checks that shipyard is in a good state for
the purposes of the Undercloud Platform to proceed with processing
Checks that shipyard is in a good state for the purposes of the
Undercloud Platform to proceed with processing
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
# Ensure shipyard is up and running.
operator = UcpHealthCheckOperator(
shipyard = UcpHealthCheckOperator(
task_id='shipyard_preflight_check',
shipyard_conf=config_path,
ucp_node='shipyard',
dag=dag)
return dag
def deckhand_preflight_check(
parent_dag_name,
child_dag_name,
args, ):
'''
Checks that deckhand is in a good state for
the purposes of the Undercloud Platform to proceed with processing
Checks that deckhand is in a good state for the purposes of the
Undercloud Platform to proceed with processing
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
# Ensure deckhand is up and running.
operator = UcpHealthCheckOperator(
deckhand = UcpHealthCheckOperator(
task_id='deckhand_preflight_check',
shipyard_conf=config_path,
ucp_node='deckhand',
dag=dag)
return dag
def drydock_preflight_check(parent_dag_name, child_dag_name, args):
'''
Checks that drydock is in a good state for
the purposes of the Undercloud Platform to proceed with processing
Checks that drydock is in a good state for the purposes of the
Undercloud Platform to proceed with processing
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
# Ensure drydock is up and running.
operator = UcpHealthCheckOperator(
drydock = UcpHealthCheckOperator(
task_id='drydock_preflight_check',
shipyard_conf=config_path,
ucp_node='drydock',
dag=dag)
return dag
def armada_preflight_check(parent_dag_name, child_dag_name, args):
'''
Checks that armada is in a good state for
the purposes of the Undercloud Platform to proceed with processing
Checks that armada is in a good state for the purposes of the
Undercloud Platform to proceed with processing
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
# Ensure armada is up and running.
operator = UcpHealthCheckOperator(
armada = UcpHealthCheckOperator(
task_id='armada_preflight_check',
shipyard_conf=config_path,
ucp_node='armada',
dag=dag)
return dag
# Names used for sub-subdags in the all preflight check subdag
K8S_PREFLIGHT_CHECK_DAG_NAME = 'k8s_preflight_check'
SHIPYARD_PREFLIGHT_CHECK_DAG_NAME = 'shipyard_preflight_check'
DECKHAND_PREFLIGHT_CHECK_DAG_NAME = 'deckhand_preflight_check'
DRYDOCK_PREFLIGHT_CHECK_DAG_NAME = 'drydock_preflight_check'
ARMADA_PREFLIGHT_CHECK_DAG_NAME = 'armada_preflight_check'
def all_preflight_checks(parent_dag_name, child_dag_name, args):
'''
puts all of the preflight checks into an atomic unit.
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
k8s = SubDagOperator(
subdag=k8s_preflight_check(dag.dag_id, K8S_PREFLIGHT_CHECK_DAG_NAME,
args),
task_id=K8S_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, )
shipyard = SubDagOperator(
subdag=shipyard_preflight_check(
dag.dag_id, SHIPYARD_PREFLIGHT_CHECK_DAG_NAME, args),
task_id=SHIPYARD_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, )
deckhand = SubDagOperator(
subdag=deckhand_preflight_check(
dag.dag_id, DECKHAND_PREFLIGHT_CHECK_DAG_NAME, args),
task_id=DECKHAND_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, )
drydock = SubDagOperator(
subdag=drydock_preflight_check(dag.dag_id,
DRYDOCK_PREFLIGHT_CHECK_DAG_NAME, args),
task_id=DRYDOCK_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, )
armada = SubDagOperator(
subdag=armada_preflight_check(dag.dag_id,
ARMADA_PREFLIGHT_CHECK_DAG_NAME, args),
task_id=ARMADA_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, )
return dag

View File

@ -16,47 +16,12 @@ from airflow.models import DAG
from airflow.operators import DeckhandOperator
from airflow.operators import DryDockOperator
from airflow.operators import PlaceholderOperator
from airflow.operators.subdag_operator import SubDagOperator
'''
Note that in the event where the 'deploy_site' Action is triggered
from Shipyard, the 'parent_dag_name' variable gets assigned with
'deploy_site.validate_site_design'. The name of the main dag will
be the front part of that value, i.e. 'deploy_site'. Hence we will
extract the front part and assign it to main_dag for the functions
defined below
'''
# Location of shiyard.conf
# Note that the shipyard.conf file needs to be placed on a volume
# that can be accessed by the containers
config_path = '/usr/local/airflow/plugins/shipyard.conf'
# Names used for sub-subdags in UCP components design verification
DECKHAND_VALIDATE_DOCS_DAG_NAME = 'deckhand_validate_site_design'
def deckhand_validate_site_design(parent_dag_name, child_dag_name, args):
'''
Validate Site Design - Deckhand
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
# Assigns value 'deploy_site.deckhand_validate_site_design' to
# the sub_dag
child_dag = parent_dag_name[0:parent_dag_name.find('.')] + \
'.' + DECKHAND_VALIDATE_DOCS_DAG_NAME
operator = DeckhandOperator(
task_id=DECKHAND_VALIDATE_DOCS_DAG_NAME,
shipyard_conf=config_path,
action=DECKHAND_VALIDATE_DOCS_DAG_NAME,
main_dag_name=parent_dag_name[0:parent_dag_name.find('.')],
sub_dag_name=child_dag,
dag=dag)
return dag
def validate_site_design(parent_dag_name, child_dag_name, args):
'''
@ -66,17 +31,20 @@ def validate_site_design(parent_dag_name, child_dag_name, args):
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args)
deckhand_validate_docs = SubDagOperator(
subdag=deckhand_validate_site_design(dag.dag_id,
DECKHAND_VALIDATE_DOCS_DAG_NAME,
args),
task_id=DECKHAND_VALIDATE_DOCS_DAG_NAME,
deckhand_validate_docs = DeckhandOperator(
task_id='deckhand_validate_site_design',
shipyard_conf=config_path,
action='deckhand_validate_site_design',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
drydock_validate_docs = DryDockOperator(
task_id='drydock_validate_site_design',
shipyard_conf=config_path,
action='validate_site_design',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# TODO () use the real operator here

View File

@ -89,7 +89,7 @@ class ArmadaOperator(BaseOperator):
# Retrieve armada_client via XCOM so as to perform other tasks
armada_client = task_instance.xcom_pull(
task_ids='create_armada_client',
dag_id=self.sub_dag_name + '.create_armada_client')
dag_id=self.main_dag_name + '.' + self.sub_dag_name)
# Retrieve Tiller Information and assign to context 'query'
context['query'] = self.get_tiller_info(context)

View File

@ -90,14 +90,14 @@ class DeckhandOperator(BaseOperator):
# Validate Design using DeckHand
elif self.action == 'deckhand_validate_site_design':
# Retrieve revision_id from xcom
# Note that in the case of 'deploy_site', the dag_id will be
# 'deploy_site.deckhand_get_design_version.deckhand_get_design_version'
# for the 'deckhand_get_design_version' task. We need to extract
# the xcom value from it in order to get the value of the last
# committed revision ID
# Note that in the case of 'deploy_site', the dag_id will
# be 'deploy_site.deckhand_get_design_version' for the
# 'deckhand_get_design_version' task. We need to extract
# the xcom value from it in order to get the value of the
# last committed revision ID
context['revision_id'] = task_instance.xcom_pull(
task_ids='deckhand_get_design_version',
dag_id=self.main_dag_name + '.deckhand_get_design_version' * 2)
dag_id=self.main_dag_name + '.deckhand_get_design_version')
logging.info("Revision ID is %d", context['revision_id'])
self.deckhand_validate_site(context)

View File

@ -112,15 +112,23 @@ class DryDockOperator(BaseOperator):
# Drydock Validate Site Design
if self.action == 'validate_site_design':
# Initialize variable
site_design_validity = 'invalid'
# Reset 'svc_type' to DryDock instead of DeckHand
context['svc_type'] = 'physicalprovisioner'
# Retrieve Endpoint Information
context['svc_endpoint'] = ucp_service_endpoint(self, context)
self.drydock_validate_design(context)
site_design_validity = self.drydock_validate_design(context)
return site_design_validity
# Retrieve drydock_client via XCOM so as to perform other tasks
drydock_client = task_instance.xcom_pull(
task_ids='create_drydock_client',
dag_id=self.sub_dag_name + '.create_drydock_client')
dag_id=self.main_dag_name + '.' + self.sub_dag_name)
# Read shipyard.conf
config = configparser.ConfigParser()
@ -325,14 +333,14 @@ class DryDockOperator(BaseOperator):
logging.info("Deckhand endpoint is %s", context['svc_endpoint'])
# Retrieve revision_id from xcom
# Note that in the case of 'deploy_site', the dag_id will be
# 'deploy_site.deckhand_get_design_version.deckhand_get_design_version'
# for the 'deckhand_get_design_version' task. We need to extract
# the xcom value from it in order to get the value of the last
# committed revision ID
# Note that in the case of 'deploy_site', the dag_id will
# be 'deploy_site.deckhand_get_design_version' for the
# 'deckhand_get_design_version' task. We need to extract
# the xcom value from it in order to get the value of the
# last committed revision ID
committed_revision_id = context['task_instance'].xcom_pull(
task_ids='deckhand_get_design_version',
dag_id=self.main_dag_name + '.deckhand_get_design_version' * 2)
dag_id=self.main_dag_name + '.deckhand_get_design_version')
# Form DeckHand Design Reference Path that we will use to retrieve
# the DryDock YAMLs
@ -378,13 +386,14 @@ class DryDockOperator(BaseOperator):
# Convert response to string
validate_site_design = design_validate_response.text
# Prints response
# Print response
logging.info("Retrieving DryDock validate site design response...")
logging.debug(json.loads(validate_site_design))
logging.info(json.loads(validate_site_design))
# Check if site design is valid
if json.loads(validate_site_design).get('status') == 'Valid':
logging.info("DryDock Site Design has been successfully validated")
return 'valid'
else:
raise AirflowException("DryDock Site Design Validation Failed!")