diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py index 9424322c..78749c82 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py @@ -14,8 +14,6 @@ from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import BranchPythonOperator from airflow.operators.python_operator import PythonOperator -from airflow.operators.subdag_operator import SubDagOperator - try: # Operators are loaded from being registered to airflow.operators @@ -182,7 +180,7 @@ class CommonStepFactory(object): verify_nodes_exist=verify_nodes_exist ) - def get_relabel_nodes(self, task_id=dn.RELABEL_NODES_DAG_NAME): + def get_relabel_nodes(self, task_id=dn.RELABEL_NODES_TG_NAME): """Generate the relabel nodes step This step uses Drydock to relabel select nodes. @@ -221,8 +219,8 @@ class CommonStepFactory(object): This version of destroying servers does no pre-validations or extra shutdowns of anything. It unconditionally triggers Drydock to destroy - the server. The counterpart to this step is the subdag returned by the - get_destroy_server method below. + the server. The counterpart to this step is the TaskGroup returned by + the get_destroy_server method below. """ return DrydockDestroyNodeOperator( shipyard_conf=config_path, @@ -231,19 +229,14 @@ class CommonStepFactory(object): on_failure_callback=step_failure_handler, dag=self.dag) - def get_destroy_server(self, task_id=dn.DESTROY_SERVER_DAG_NAME): + def get_destroy_server(self): """Generate a destroy server step Destroy server tears down kubernetes and hardware """ - return SubDagOperator( - subdag=destroy_server( - self.parent_dag_name, - task_id, - args=self.default_args), - task_id=task_id, - on_failure_callback=step_failure_handler, - dag=self.dag) + return destroy_server( + self.dag + ) def get_decide_airflow_upgrade(self, task_id=dn.DECIDE_AIRFLOW_UPGRADE): """Generate the decide_airflow_upgrade step @@ -258,13 +251,13 @@ class CommonStepFactory(object): worker. The decision will be based on the xcom value that is retrieved from the 'armada_post_apply' task """ - # DAG ID will be parent + subdag name - dag_id = self.parent_dag_name + '.' + dn.ARMADA_BUILD_DAG_NAME + # DAG ID will be parent + dag_id = self.parent_dag_name # Check if Shipyard/Airflow were upgraded by the workflow upgrade_airflow = kwargs['ti'].xcom_pull( key='upgrade_airflow_worker', - task_ids='armada_post_apply', + task_ids='armada_build.armada_post_apply', dag_id=dag_id) # Go to the branch to upgrade Airflow worker if the Shipyard diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py index a24e3fc6..2ac7b9e5 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py @@ -12,17 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -# Subdags -ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight' +# TaskGroups +ALL_PREFLIGHT_CHECKS_TG_NAME = 'preflight' UCP_PREFLIGHT_NAME = 'ucp_preflight_check' -ARMADA_BUILD_DAG_NAME = 'armada_build' -DESTROY_SERVER_DAG_NAME = 'destroy_server' -DRYDOCK_BUILD_DAG_NAME = 'drydock_build' -VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design' -RELABEL_NODES_DAG_NAME = 'relabel_nodes' +ARMADA_BUILD_TG_NAME = 'armada_build' +DESTROY_SERVER_TG_NAME = 'destroy_server' +DRYDOCK_BUILD_TG_NAME = 'drydock_build' +VALIDATE_SITE_DESIGN_TG_NAME = 'validate_site_design' +RELABEL_NODES_TG_NAME = 'relabel_nodes' # Steps ACTION_XCOM = 'action_xcom' +ARMADA_BUILD_ARMADA_POST_APPLY = 'armada_build.armada_post_apply' +ARMADA_BUILD_ARMADA_GET_RELEASES = 'armada_build.armada_get_releases' ARMADA_TEST_RELEASES = 'armada_test_releases' CONCURRENCY_CHECK = 'dag_concurrency_check' CREATE_ACTION_TAG = 'create_action_tag' @@ -39,7 +41,8 @@ FINAL_DEPLOYMENT_STATUS = 'final_deployment_status' # Define a list of critical steps, used to determine successfulness of a # still-running DAG CRITICAL_DAG_STEPS = [ - ARMADA_BUILD_DAG_NAME, + ARMADA_BUILD_ARMADA_POST_APPLY, + ARMADA_BUILD_ARMADA_GET_RELEASES, SKIP_UPGRADE_AIRFLOW, UPGRADE_AIRFLOW, ARMADA_TEST_RELEASES diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/destroy_node.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/destroy_node.py index 6ac45d10..ab11b0cf 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/destroy_node.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/destroy_node.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from airflow.models import DAG +from airflow.utils.task_group import TaskGroup try: from airflow.operators import DrydockDestroyNodeOperator @@ -38,65 +38,56 @@ except ImportError: from shipyard_airflow.dags.config_path import config_path -def destroy_server(parent_dag_name, child_dag_name, args): +def destroy_server(dag): """DAG to tear down node The DAG will make use of the promenade and drydock operators to tear down a target node. """ - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, - schedule_interval=None) + with TaskGroup(group_id="destroy_server", dag=dag) as destroy_server: - # Drain Node - promenade_drain_node = PromenadeDrainNodeOperator( - task_id='promenade_drain_node', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - dag=dag) + # Drain Node + promenade_drain_node = PromenadeDrainNodeOperator( + task_id='promenade_drain_node', + shipyard_conf=config_path, + dag=dag) - # Clear Labels - promenade_clear_labels = PromenadeClearLabelsOperator( - task_id='promenade_clear_labels', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - dag=dag) + # Clear Labels + promenade_clear_labels = PromenadeClearLabelsOperator( + task_id='promenade_clear_labels', + shipyard_conf=config_path, + dag=dag) - # Shutdown Kubelet - promenade_shutdown_kubelet = PromenadeShutdownKubeletOperator( - task_id='promenade_shutdown_kubelet', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - dag=dag) + # Shutdown Kubelet + promenade_shutdown_kubelet = PromenadeShutdownKubeletOperator( + task_id='promenade_shutdown_kubelet', + shipyard_conf=config_path, + dag=dag) - # ETCD Sanity Check - promenade_check_etcd = PromenadeCheckEtcdOperator( - task_id='promenade_check_etcd', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - dag=dag) + # ETCD Sanity Check + promenade_check_etcd = PromenadeCheckEtcdOperator( + task_id='promenade_check_etcd', + shipyard_conf=config_path, + dag=dag) - # Power down and destroy node using DryDock - drydock_destroy_node = DrydockDestroyNodeOperator( - task_id='destroy_node', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - dag=dag) + # Power down and destroy node using DryDock + drydock_destroy_node = DrydockDestroyNodeOperator( + task_id='destroy_node', + shipyard_conf=config_path, + dag=dag) - # Decommission node from Kubernetes cluster using Promenade - promenade_decommission_node = PromenadeDecommissionNodeOperator( - task_id='promenade_decommission_node', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - dag=dag) + # Decommission node from Kubernetes cluster using Promenade + promenade_decommission_node = PromenadeDecommissionNodeOperator( + task_id='promenade_decommission_node', + shipyard_conf=config_path, + dag=dag) - # Define dependencies - promenade_clear_labels.set_upstream(promenade_drain_node) - promenade_shutdown_kubelet.set_upstream(promenade_clear_labels) - promenade_check_etcd.set_upstream(promenade_shutdown_kubelet) - drydock_destroy_node.set_upstream(promenade_check_etcd) - promenade_decommission_node.set_upstream(drydock_destroy_node) + # Define dependencies + promenade_clear_labels.set_upstream(promenade_drain_node) + promenade_shutdown_kubelet.set_upstream(promenade_clear_labels) + promenade_check_etcd.set_upstream(promenade_shutdown_kubelet) + drydock_destroy_node.set_upstream(promenade_check_etcd) + promenade_decommission_node.set_upstream(drydock_destroy_node) - return dag + return destroy_server diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/preflight_checks.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/preflight_checks.py index fa9e0859..52f34c4c 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/preflight_checks.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/preflight_checks.py @@ -25,7 +25,7 @@ except ImportError: def all_preflight_checks(dag): ''' - Pre-Flight Checks Subdag + Pre-Flight Checks TaskGroup ''' with TaskGroup(group_id="preflight", dag=dag) as preflight: ''' diff --git a/src/bin/shipyard_airflow/tests/unit/control/test_action_helper.py b/src/bin/shipyard_airflow/tests/unit/control/test_action_helper.py index bd0ad3d1..4930e972 100644 --- a/src/bin/shipyard_airflow/tests/unit/control/test_action_helper.py +++ b/src/bin/shipyard_airflow/tests/unit/control/test_action_helper.py @@ -86,7 +86,7 @@ def get_fake_latest_step_dict_failed(): :rtype: dict """ return { - 'armada_build': {'state': 'failed'}, + 'armada_build.armada_post_apply': {'state': 'failed'}, 'arbitrary_step': {'state': 'success'}, 'another_arbitrary_step': {'state': 'running'}, 'upgrade_airflow': {'state': 'success'},