From 4de8c00830dcf32217bf143ccc336cbc0d8252ee Mon Sep 17 00:00:00 2001 From: Anthony Lin Date: Thu, 14 Dec 2017 07:24:18 +0000 Subject: [PATCH] Collapse Extra Subdag Layer Across Dags We will collapse the extra subdag layer which is not really needed for functionalities. This will make the dags leaner and easier to control. Corresponding changes have been made to the Operators as well Change-Id: I84135a15357abf902d93f3cb085c7c9a21c38b7d --- shipyard_airflow/dags/armada_deploy_site.py | 12 ++ shipyard_airflow/dags/deckhand_get_design.py | 42 ++---- shipyard_airflow/dags/drydock_deploy_site.py | 96 +++++-------- shipyard_airflow/dags/preflight_checks.py | 133 +++--------------- shipyard_airflow/dags/validate_site_design.py | 52 ++----- shipyard_airflow/plugins/armada_operator.py | 2 +- shipyard_airflow/plugins/deckhand_operator.py | 12 +- shipyard_airflow/plugins/drydock_operators.py | 29 ++-- 8 files changed, 113 insertions(+), 265 deletions(-) diff --git a/shipyard_airflow/dags/armada_deploy_site.py b/shipyard_airflow/dags/armada_deploy_site.py index fc0282b2..e33649a3 100644 --- a/shipyard_airflow/dags/armada_deploy_site.py +++ b/shipyard_airflow/dags/armada_deploy_site.py @@ -16,6 +16,8 @@ from airflow.models import DAG from airflow.operators import ArmadaOperator # Location of shiyard.conf +# Note that the shipyard.conf file needs to be placed on a volume +# that can be accessed by the containers config_path = '/usr/local/airflow/plugins/shipyard.conf' @@ -32,6 +34,8 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args): task_id='create_armada_client', shipyard_conf=config_path, action='create_armada_client', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, dag=dag) # Get Tiller Status @@ -39,6 +43,8 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args): task_id='armada_status', shipyard_conf=config_path, action='armada_status', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, dag=dag) # Validate Armada YAMLs @@ -46,6 +52,8 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args): task_id='armada_validate', shipyard_conf=config_path, action='armada_validate', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, dag=dag) # Armada Apply @@ -53,6 +61,8 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args): task_id='armada_apply', shipyard_conf=config_path, action='armada_apply', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, retries=10, dag=dag) @@ -61,6 +71,8 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args): task_id='armada_get_releases', shipyard_conf=config_path, action='armada_get_releases', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, dag=dag) # Define dependencies diff --git a/shipyard_airflow/dags/deckhand_get_design.py b/shipyard_airflow/dags/deckhand_get_design.py index e1a5a08c..79bd0e52 100644 --- a/shipyard_airflow/dags/deckhand_get_design.py +++ b/shipyard_airflow/dags/deckhand_get_design.py @@ -14,14 +14,15 @@ from airflow.models import DAG from airflow.operators import DeckhandOperator -from airflow.operators.subdag_operator import SubDagOperator # Location of shiyard.conf +# Note that the shipyard.conf file needs to be placed on a volume +# that can be accessed by the containers config_path = '/usr/local/airflow/plugins/shipyard.conf' -def get_design_version(parent_dag_name, child_dag_name, args): +def get_design_deckhand(parent_dag_name, child_dag_name, args): ''' Get Deckhand Design Version ''' @@ -29,39 +30,12 @@ def get_design_version(parent_dag_name, child_dag_name, args): '{}.{}'.format(parent_dag_name, child_dag_name), default_args=args) - # Note that in the event where the 'deploy_site' Action is - # triggered from Shipyard, the 'parent_dag_name' variable - # gets assigned with 'deploy_site.deckhand_get_design_version'. - # This is the name that we want to assign to the subdag so - # that we can reference it for xcom. The name of the main - # dag will be the front part of that value, i.e. 'deploy_site'. - # Hence we will extract the front part and assign it to main_dag. - # We will reuse this pattern for other Actions, e.g. update_site, - # redeploy_site as well. - operator = DeckhandOperator( - task_id=child_dag_name, + deckhand_design = DeckhandOperator( + task_id='deckhand_get_design_version', shipyard_conf=config_path, - action=child_dag_name, - main_dag_name=parent_dag_name[0:parent_dag_name.find('.')], - sub_dag_name=parent_dag_name, - dag=dag) - - return dag - - -def get_design_deckhand(parent_dag_name, child_dag_name, args): - ''' - Puts into atomic unit - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args) - - deckhand_design = SubDagOperator( - subdag=get_design_version(dag.dag_id, - child_dag_name, - args), - task_id=child_dag_name, + action='deckhand_get_design_version', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, dag=dag) return dag diff --git a/shipyard_airflow/dags/drydock_deploy_site.py b/shipyard_airflow/dags/drydock_deploy_site.py index 001f3614..728938fc 100644 --- a/shipyard_airflow/dags/drydock_deploy_site.py +++ b/shipyard_airflow/dags/drydock_deploy_site.py @@ -13,92 +13,64 @@ # limitations under the License. from airflow.models import DAG -from airflow.operators.subdag_operator import SubDagOperator from airflow.operators import DryDockOperator # Location of shiyard.conf +# Note that the shipyard.conf file needs to be placed on a volume +# that can be accessed by the containers config_path = '/usr/local/airflow/plugins/shipyard.conf' -# Names used for sub-subdags in the drydock site deployment subdag -CREATE_DRYDOCK_CLIENT_DAG_NAME = 'create_drydock_client' -DRYDOCK_VERIFY_SITE_DAG_NAME = 'verify_site' -DRYDOCK_PREPARE_SITE_DAG_NAME = 'prepare_site' -DRYDOCK_PREPARE_NODE_DAG_NAME = 'prepare_nodes' -DRYDOCK_DEPLOY_NODE_DAG_NAME = 'deploy_nodes' - - -def get_drydock_subdag_step(parent_dag_name, child_dag_name, args): - ''' - Execute DryDock Subdag - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args) - # Note that in the event where the 'deploy_site' action is - # triggered from Shipyard, the 'parent_dag_name' variable - # gets assigned with 'deploy_site.create_drydock_client'. - # This is the name that we want to assign to the subdag so - # that we can reference it for xcom. The name of the main - # dag will be the front part of that value, i.e. 'deploy_site'. - # Hence we will extract the front part and assign it to main_dag. - # We will reuse this pattern for other Actions, e.g. update_site, - # redeploy_site as well. - operator = DryDockOperator( - task_id=child_dag_name, - shipyard_conf=config_path, - action=child_dag_name, - main_dag_name=parent_dag_name[0:parent_dag_name.find('.')], - sub_dag_name=parent_dag_name, - dag=dag) - - return dag - def deploy_site_drydock(parent_dag_name, child_dag_name, args): ''' - Puts all of the drydock deploy site into atomic unit + DryDock Subdag ''' dag = DAG( '{}.{}'.format(parent_dag_name, child_dag_name), default_args=args) - drydock_client = SubDagOperator( - subdag=get_drydock_subdag_step(dag.dag_id, - CREATE_DRYDOCK_CLIENT_DAG_NAME, - args), - task_id=CREATE_DRYDOCK_CLIENT_DAG_NAME, + drydock_client = DryDockOperator( + task_id='create_drydock_client', + shipyard_conf=config_path, + action='create_drydock_client', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, dag=dag) - drydock_verify_site = SubDagOperator( - subdag=get_drydock_subdag_step(dag.dag_id, - DRYDOCK_VERIFY_SITE_DAG_NAME, - args), - task_id=DRYDOCK_VERIFY_SITE_DAG_NAME, + drydock_verify_site = DryDockOperator( + task_id='verify_site', + shipyard_conf=config_path, + action='verify_site', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, dag=dag) - drydock_prepare_site = SubDagOperator( - subdag=get_drydock_subdag_step(dag.dag_id, - DRYDOCK_PREPARE_SITE_DAG_NAME, - args), - task_id=DRYDOCK_PREPARE_SITE_DAG_NAME, + drydock_prepare_site = DryDockOperator( + task_id='prepare_site', + shipyard_conf=config_path, + action='prepare_site', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, dag=dag) - drydock_prepare_nodes = SubDagOperator( - subdag=get_drydock_subdag_step(dag.dag_id, - DRYDOCK_PREPARE_NODE_DAG_NAME, - args), - task_id=DRYDOCK_PREPARE_NODE_DAG_NAME, + drydock_prepare_nodes = DryDockOperator( + task_id='prepare_nodes', + shipyard_conf=config_path, + action='prepare_nodes', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, dag=dag) - drydock_deploy_nodes = SubDagOperator( - subdag=get_drydock_subdag_step(dag.dag_id, - DRYDOCK_DEPLOY_NODE_DAG_NAME, - args), - task_id=DRYDOCK_DEPLOY_NODE_DAG_NAME, + drydock_deploy_nodes = DryDockOperator( + task_id='deploy_nodes', + shipyard_conf=config_path, + action='deploy_nodes', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, dag=dag) - # DAG Wiring + # Define dependencies drydock_verify_site.set_upstream(drydock_client) drydock_prepare_site.set_upstream(drydock_verify_site) drydock_prepare_nodes.set_upstream(drydock_prepare_site) diff --git a/shipyard_airflow/dags/preflight_checks.py b/shipyard_airflow/dags/preflight_checks.py index 198fadd4..6d46922d 100644 --- a/shipyard_airflow/dags/preflight_checks.py +++ b/shipyard_airflow/dags/preflight_checks.py @@ -13,159 +13,72 @@ # limitations under the License. from airflow.models import DAG -from airflow.operators.subdag_operator import SubDagOperator from airflow.operators import K8sHealthCheckOperator from airflow.operators import UcpHealthCheckOperator # Location of shiyard.conf -config_path = '/usr/local/airflow/plugins/shipyard.conf' - # Note that the shipyard.conf file needs to be placed on a volume # that can be accessed by the containers +config_path = '/usr/local/airflow/plugins/shipyard.conf' -def k8s_preflight_check(parent_dag_name, child_dag_name, args): +# TODO: Add Checks for Promenade when the API is ready +def all_preflight_checks(parent_dag_name, child_dag_name, args): ''' - The k8s_preflight_check checks that k8s is in a good state for - the purposes of the Undercloud Platform to proceed with processing + Pre-Flight Checks Subdag ''' dag = DAG( '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) + default_args=args) - # Ensure k8s is up and running. - # Ensure that pods are not crashed - operator = K8sHealthCheckOperator( + ''' + The k8s_preflight_check checks that k8s is in a good state + for the purposes of the Undercloud Platform to proceed with + processing + ''' + k8s = K8sHealthCheckOperator( task_id='k8s_preflight_check', dag=dag) - return dag - - -def shipyard_preflight_check(parent_dag_name, child_dag_name, args): ''' - Checks that shipyard is in a good state for - the purposes of the Undercloud Platform to proceed with processing + Checks that shipyard is in a good state for the purposes of the + Undercloud Platform to proceed with processing ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - # Ensure shipyard is up and running. - operator = UcpHealthCheckOperator( + shipyard = UcpHealthCheckOperator( task_id='shipyard_preflight_check', shipyard_conf=config_path, ucp_node='shipyard', dag=dag) - return dag - - -def deckhand_preflight_check( - parent_dag_name, - child_dag_name, - args, ): ''' - Checks that deckhand is in a good state for - the purposes of the Undercloud Platform to proceed with processing + Checks that deckhand is in a good state for the purposes of the + Undercloud Platform to proceed with processing ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - # Ensure deckhand is up and running. - operator = UcpHealthCheckOperator( + deckhand = UcpHealthCheckOperator( task_id='deckhand_preflight_check', shipyard_conf=config_path, ucp_node='deckhand', dag=dag) - return dag - - -def drydock_preflight_check(parent_dag_name, child_dag_name, args): ''' - Checks that drydock is in a good state for - the purposes of the Undercloud Platform to proceed with processing + Checks that drydock is in a good state for the purposes of the + Undercloud Platform to proceed with processing ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - # Ensure drydock is up and running. - operator = UcpHealthCheckOperator( + drydock = UcpHealthCheckOperator( task_id='drydock_preflight_check', shipyard_conf=config_path, ucp_node='drydock', dag=dag) - return dag - - -def armada_preflight_check(parent_dag_name, child_dag_name, args): ''' - Checks that armada is in a good state for - the purposes of the Undercloud Platform to proceed with processing + Checks that armada is in a good state for the purposes of the + Undercloud Platform to proceed with processing ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - # Ensure armada is up and running. - operator = UcpHealthCheckOperator( + armada = UcpHealthCheckOperator( task_id='armada_preflight_check', shipyard_conf=config_path, ucp_node='armada', dag=dag) return dag - - -# Names used for sub-subdags in the all preflight check subdag -K8S_PREFLIGHT_CHECK_DAG_NAME = 'k8s_preflight_check' -SHIPYARD_PREFLIGHT_CHECK_DAG_NAME = 'shipyard_preflight_check' -DECKHAND_PREFLIGHT_CHECK_DAG_NAME = 'deckhand_preflight_check' -DRYDOCK_PREFLIGHT_CHECK_DAG_NAME = 'drydock_preflight_check' -ARMADA_PREFLIGHT_CHECK_DAG_NAME = 'armada_preflight_check' - - -def all_preflight_checks(parent_dag_name, child_dag_name, args): - ''' - puts all of the preflight checks into an atomic unit. - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - k8s = SubDagOperator( - subdag=k8s_preflight_check(dag.dag_id, K8S_PREFLIGHT_CHECK_DAG_NAME, - args), - task_id=K8S_PREFLIGHT_CHECK_DAG_NAME, - dag=dag, ) - - shipyard = SubDagOperator( - subdag=shipyard_preflight_check( - dag.dag_id, SHIPYARD_PREFLIGHT_CHECK_DAG_NAME, args), - task_id=SHIPYARD_PREFLIGHT_CHECK_DAG_NAME, - dag=dag, ) - - deckhand = SubDagOperator( - subdag=deckhand_preflight_check( - dag.dag_id, DECKHAND_PREFLIGHT_CHECK_DAG_NAME, args), - task_id=DECKHAND_PREFLIGHT_CHECK_DAG_NAME, - dag=dag, ) - - drydock = SubDagOperator( - subdag=drydock_preflight_check(dag.dag_id, - DRYDOCK_PREFLIGHT_CHECK_DAG_NAME, args), - task_id=DRYDOCK_PREFLIGHT_CHECK_DAG_NAME, - dag=dag, ) - - armada = SubDagOperator( - subdag=armada_preflight_check(dag.dag_id, - ARMADA_PREFLIGHT_CHECK_DAG_NAME, args), - task_id=ARMADA_PREFLIGHT_CHECK_DAG_NAME, - dag=dag, ) - - return dag diff --git a/shipyard_airflow/dags/validate_site_design.py b/shipyard_airflow/dags/validate_site_design.py index f13c8724..0b2d96da 100644 --- a/shipyard_airflow/dags/validate_site_design.py +++ b/shipyard_airflow/dags/validate_site_design.py @@ -16,47 +16,12 @@ from airflow.models import DAG from airflow.operators import DeckhandOperator from airflow.operators import DryDockOperator from airflow.operators import PlaceholderOperator -from airflow.operators.subdag_operator import SubDagOperator - -''' -Note that in the event where the 'deploy_site' Action is triggered -from Shipyard, the 'parent_dag_name' variable gets assigned with -'deploy_site.validate_site_design'. The name of the main dag will -be the front part of that value, i.e. 'deploy_site'. Hence we will -extract the front part and assign it to main_dag for the functions -defined below -''' # Location of shiyard.conf +# Note that the shipyard.conf file needs to be placed on a volume +# that can be accessed by the containers config_path = '/usr/local/airflow/plugins/shipyard.conf' -# Names used for sub-subdags in UCP components design verification -DECKHAND_VALIDATE_DOCS_DAG_NAME = 'deckhand_validate_site_design' - - -def deckhand_validate_site_design(parent_dag_name, child_dag_name, args): - ''' - Validate Site Design - Deckhand - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - # Assigns value 'deploy_site.deckhand_validate_site_design' to - # the sub_dag - child_dag = parent_dag_name[0:parent_dag_name.find('.')] + \ - '.' + DECKHAND_VALIDATE_DOCS_DAG_NAME - - operator = DeckhandOperator( - task_id=DECKHAND_VALIDATE_DOCS_DAG_NAME, - shipyard_conf=config_path, - action=DECKHAND_VALIDATE_DOCS_DAG_NAME, - main_dag_name=parent_dag_name[0:parent_dag_name.find('.')], - sub_dag_name=child_dag, - dag=dag) - - return dag - def validate_site_design(parent_dag_name, child_dag_name, args): ''' @@ -66,17 +31,20 @@ def validate_site_design(parent_dag_name, child_dag_name, args): '{}.{}'.format(parent_dag_name, child_dag_name), default_args=args) - deckhand_validate_docs = SubDagOperator( - subdag=deckhand_validate_site_design(dag.dag_id, - DECKHAND_VALIDATE_DOCS_DAG_NAME, - args), - task_id=DECKHAND_VALIDATE_DOCS_DAG_NAME, + deckhand_validate_docs = DeckhandOperator( + task_id='deckhand_validate_site_design', + shipyard_conf=config_path, + action='deckhand_validate_site_design', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, dag=dag) drydock_validate_docs = DryDockOperator( task_id='drydock_validate_site_design', shipyard_conf=config_path, action='validate_site_design', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, dag=dag) # TODO () use the real operator here diff --git a/shipyard_airflow/plugins/armada_operator.py b/shipyard_airflow/plugins/armada_operator.py index 0e963cf4..3ac8af86 100644 --- a/shipyard_airflow/plugins/armada_operator.py +++ b/shipyard_airflow/plugins/armada_operator.py @@ -89,7 +89,7 @@ class ArmadaOperator(BaseOperator): # Retrieve armada_client via XCOM so as to perform other tasks armada_client = task_instance.xcom_pull( task_ids='create_armada_client', - dag_id=self.sub_dag_name + '.create_armada_client') + dag_id=self.main_dag_name + '.' + self.sub_dag_name) # Retrieve Tiller Information and assign to context 'query' context['query'] = self.get_tiller_info(context) diff --git a/shipyard_airflow/plugins/deckhand_operator.py b/shipyard_airflow/plugins/deckhand_operator.py index 8cd113cf..925c8613 100644 --- a/shipyard_airflow/plugins/deckhand_operator.py +++ b/shipyard_airflow/plugins/deckhand_operator.py @@ -90,14 +90,14 @@ class DeckhandOperator(BaseOperator): # Validate Design using DeckHand elif self.action == 'deckhand_validate_site_design': # Retrieve revision_id from xcom - # Note that in the case of 'deploy_site', the dag_id will be - # 'deploy_site.deckhand_get_design_version.deckhand_get_design_version' - # for the 'deckhand_get_design_version' task. We need to extract - # the xcom value from it in order to get the value of the last - # committed revision ID + # Note that in the case of 'deploy_site', the dag_id will + # be 'deploy_site.deckhand_get_design_version' for the + # 'deckhand_get_design_version' task. We need to extract + # the xcom value from it in order to get the value of the + # last committed revision ID context['revision_id'] = task_instance.xcom_pull( task_ids='deckhand_get_design_version', - dag_id=self.main_dag_name + '.deckhand_get_design_version' * 2) + dag_id=self.main_dag_name + '.deckhand_get_design_version') logging.info("Revision ID is %d", context['revision_id']) self.deckhand_validate_site(context) diff --git a/shipyard_airflow/plugins/drydock_operators.py b/shipyard_airflow/plugins/drydock_operators.py index 1867c4e0..ae924417 100644 --- a/shipyard_airflow/plugins/drydock_operators.py +++ b/shipyard_airflow/plugins/drydock_operators.py @@ -112,15 +112,23 @@ class DryDockOperator(BaseOperator): # Drydock Validate Site Design if self.action == 'validate_site_design': + # Initialize variable + site_design_validity = 'invalid' + + # Reset 'svc_type' to DryDock instead of DeckHand + context['svc_type'] = 'physicalprovisioner' + # Retrieve Endpoint Information context['svc_endpoint'] = ucp_service_endpoint(self, context) - self.drydock_validate_design(context) + site_design_validity = self.drydock_validate_design(context) + + return site_design_validity # Retrieve drydock_client via XCOM so as to perform other tasks drydock_client = task_instance.xcom_pull( task_ids='create_drydock_client', - dag_id=self.sub_dag_name + '.create_drydock_client') + dag_id=self.main_dag_name + '.' + self.sub_dag_name) # Read shipyard.conf config = configparser.ConfigParser() @@ -325,14 +333,14 @@ class DryDockOperator(BaseOperator): logging.info("Deckhand endpoint is %s", context['svc_endpoint']) # Retrieve revision_id from xcom - # Note that in the case of 'deploy_site', the dag_id will be - # 'deploy_site.deckhand_get_design_version.deckhand_get_design_version' - # for the 'deckhand_get_design_version' task. We need to extract - # the xcom value from it in order to get the value of the last - # committed revision ID + # Note that in the case of 'deploy_site', the dag_id will + # be 'deploy_site.deckhand_get_design_version' for the + # 'deckhand_get_design_version' task. We need to extract + # the xcom value from it in order to get the value of the + # last committed revision ID committed_revision_id = context['task_instance'].xcom_pull( task_ids='deckhand_get_design_version', - dag_id=self.main_dag_name + '.deckhand_get_design_version' * 2) + dag_id=self.main_dag_name + '.deckhand_get_design_version') # Form DeckHand Design Reference Path that we will use to retrieve # the DryDock YAMLs @@ -378,13 +386,14 @@ class DryDockOperator(BaseOperator): # Convert response to string validate_site_design = design_validate_response.text - # Prints response + # Print response logging.info("Retrieving DryDock validate site design response...") - logging.debug(json.loads(validate_site_design)) + logging.info(json.loads(validate_site_design)) # Check if site design is valid if json.loads(validate_site_design).get('status') == 'Valid': logging.info("DryDock Site Design has been successfully validated") + return 'valid' else: raise AirflowException("DryDock Site Design Validation Failed!")