Airflow fix #2

This PS fixes one more SubDAG, makes sure that decide_airflow_upgrade
task works correctly after we previously replaced armada_build SubDAG
with corresponding TaskGroup. Also a related unit test has been fixed.

Change-Id: Ic8304cc1985f8197d363861c588b8b48e13fe5d4
This commit is contained in:
Sergiy Markin 2023-09-15 22:22:29 +00:00
parent 99c2da745a
commit 1177707fae
5 changed files with 63 additions and 76 deletions

View File

@ -14,8 +14,6 @@
from airflow.operators.bash_operator import BashOperator from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.python_operator import PythonOperator from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
try: try:
# Operators are loaded from being registered to airflow.operators # Operators are loaded from being registered to airflow.operators
@ -182,7 +180,7 @@ class CommonStepFactory(object):
verify_nodes_exist=verify_nodes_exist verify_nodes_exist=verify_nodes_exist
) )
def get_relabel_nodes(self, task_id=dn.RELABEL_NODES_DAG_NAME): def get_relabel_nodes(self, task_id=dn.RELABEL_NODES_TG_NAME):
"""Generate the relabel nodes step """Generate the relabel nodes step
This step uses Drydock to relabel select nodes. This step uses Drydock to relabel select nodes.
@ -221,8 +219,8 @@ class CommonStepFactory(object):
This version of destroying servers does no pre-validations or extra This version of destroying servers does no pre-validations or extra
shutdowns of anything. It unconditionally triggers Drydock to destroy shutdowns of anything. It unconditionally triggers Drydock to destroy
the server. The counterpart to this step is the subdag returned by the the server. The counterpart to this step is the TaskGroup returned by
get_destroy_server method below. the get_destroy_server method below.
""" """
return DrydockDestroyNodeOperator( return DrydockDestroyNodeOperator(
shipyard_conf=config_path, shipyard_conf=config_path,
@ -231,19 +229,14 @@ class CommonStepFactory(object):
on_failure_callback=step_failure_handler, on_failure_callback=step_failure_handler,
dag=self.dag) dag=self.dag)
def get_destroy_server(self, task_id=dn.DESTROY_SERVER_DAG_NAME): def get_destroy_server(self):
"""Generate a destroy server step """Generate a destroy server step
Destroy server tears down kubernetes and hardware Destroy server tears down kubernetes and hardware
""" """
return SubDagOperator( return destroy_server(
subdag=destroy_server( self.dag
self.parent_dag_name, )
task_id,
args=self.default_args),
task_id=task_id,
on_failure_callback=step_failure_handler,
dag=self.dag)
def get_decide_airflow_upgrade(self, task_id=dn.DECIDE_AIRFLOW_UPGRADE): def get_decide_airflow_upgrade(self, task_id=dn.DECIDE_AIRFLOW_UPGRADE):
"""Generate the decide_airflow_upgrade step """Generate the decide_airflow_upgrade step
@ -258,13 +251,13 @@ class CommonStepFactory(object):
worker. The decision will be based on the xcom value that worker. The decision will be based on the xcom value that
is retrieved from the 'armada_post_apply' task is retrieved from the 'armada_post_apply' task
""" """
# DAG ID will be parent + subdag name # DAG ID will be parent
dag_id = self.parent_dag_name + '.' + dn.ARMADA_BUILD_DAG_NAME dag_id = self.parent_dag_name
# Check if Shipyard/Airflow were upgraded by the workflow # Check if Shipyard/Airflow were upgraded by the workflow
upgrade_airflow = kwargs['ti'].xcom_pull( upgrade_airflow = kwargs['ti'].xcom_pull(
key='upgrade_airflow_worker', key='upgrade_airflow_worker',
task_ids='armada_post_apply', task_ids='armada_build.armada_post_apply',
dag_id=dag_id) dag_id=dag_id)
# Go to the branch to upgrade Airflow worker if the Shipyard # Go to the branch to upgrade Airflow worker if the Shipyard

View File

@ -12,17 +12,19 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# Subdags # TaskGroups
ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight' ALL_PREFLIGHT_CHECKS_TG_NAME = 'preflight'
UCP_PREFLIGHT_NAME = 'ucp_preflight_check' UCP_PREFLIGHT_NAME = 'ucp_preflight_check'
ARMADA_BUILD_DAG_NAME = 'armada_build' ARMADA_BUILD_TG_NAME = 'armada_build'
DESTROY_SERVER_DAG_NAME = 'destroy_server' DESTROY_SERVER_TG_NAME = 'destroy_server'
DRYDOCK_BUILD_DAG_NAME = 'drydock_build' DRYDOCK_BUILD_TG_NAME = 'drydock_build'
VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design' VALIDATE_SITE_DESIGN_TG_NAME = 'validate_site_design'
RELABEL_NODES_DAG_NAME = 'relabel_nodes' RELABEL_NODES_TG_NAME = 'relabel_nodes'
# Steps # Steps
ACTION_XCOM = 'action_xcom' ACTION_XCOM = 'action_xcom'
ARMADA_BUILD_ARMADA_POST_APPLY = 'armada_build.armada_post_apply'
ARMADA_BUILD_ARMADA_GET_RELEASES = 'armada_build.armada_get_releases'
ARMADA_TEST_RELEASES = 'armada_test_releases' ARMADA_TEST_RELEASES = 'armada_test_releases'
CONCURRENCY_CHECK = 'dag_concurrency_check' CONCURRENCY_CHECK = 'dag_concurrency_check'
CREATE_ACTION_TAG = 'create_action_tag' CREATE_ACTION_TAG = 'create_action_tag'
@ -39,7 +41,8 @@ FINAL_DEPLOYMENT_STATUS = 'final_deployment_status'
# Define a list of critical steps, used to determine successfulness of a # Define a list of critical steps, used to determine successfulness of a
# still-running DAG # still-running DAG
CRITICAL_DAG_STEPS = [ CRITICAL_DAG_STEPS = [
ARMADA_BUILD_DAG_NAME, ARMADA_BUILD_ARMADA_POST_APPLY,
ARMADA_BUILD_ARMADA_GET_RELEASES,
SKIP_UPGRADE_AIRFLOW, SKIP_UPGRADE_AIRFLOW,
UPGRADE_AIRFLOW, UPGRADE_AIRFLOW,
ARMADA_TEST_RELEASES ARMADA_TEST_RELEASES

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from airflow.models import DAG from airflow.utils.task_group import TaskGroup
try: try:
from airflow.operators import DrydockDestroyNodeOperator from airflow.operators import DrydockDestroyNodeOperator
@ -38,65 +38,56 @@ except ImportError:
from shipyard_airflow.dags.config_path import config_path from shipyard_airflow.dags.config_path import config_path
def destroy_server(parent_dag_name, child_dag_name, args): def destroy_server(dag):
"""DAG to tear down node """DAG to tear down node
The DAG will make use of the promenade and drydock operators The DAG will make use of the promenade and drydock operators
to tear down a target node. to tear down a target node.
""" """
dag = DAG( with TaskGroup(group_id="destroy_server", dag=dag) as destroy_server:
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args,
schedule_interval=None)
# Drain Node # Drain Node
promenade_drain_node = PromenadeDrainNodeOperator( promenade_drain_node = PromenadeDrainNodeOperator(
task_id='promenade_drain_node', task_id='promenade_drain_node',
shipyard_conf=config_path, shipyard_conf=config_path,
main_dag_name=parent_dag_name, dag=dag)
dag=dag)
# Clear Labels # Clear Labels
promenade_clear_labels = PromenadeClearLabelsOperator( promenade_clear_labels = PromenadeClearLabelsOperator(
task_id='promenade_clear_labels', task_id='promenade_clear_labels',
shipyard_conf=config_path, shipyard_conf=config_path,
main_dag_name=parent_dag_name, dag=dag)
dag=dag)
# Shutdown Kubelet # Shutdown Kubelet
promenade_shutdown_kubelet = PromenadeShutdownKubeletOperator( promenade_shutdown_kubelet = PromenadeShutdownKubeletOperator(
task_id='promenade_shutdown_kubelet', task_id='promenade_shutdown_kubelet',
shipyard_conf=config_path, shipyard_conf=config_path,
main_dag_name=parent_dag_name, dag=dag)
dag=dag)
# ETCD Sanity Check # ETCD Sanity Check
promenade_check_etcd = PromenadeCheckEtcdOperator( promenade_check_etcd = PromenadeCheckEtcdOperator(
task_id='promenade_check_etcd', task_id='promenade_check_etcd',
shipyard_conf=config_path, shipyard_conf=config_path,
main_dag_name=parent_dag_name, dag=dag)
dag=dag)
# Power down and destroy node using DryDock # Power down and destroy node using DryDock
drydock_destroy_node = DrydockDestroyNodeOperator( drydock_destroy_node = DrydockDestroyNodeOperator(
task_id='destroy_node', task_id='destroy_node',
shipyard_conf=config_path, shipyard_conf=config_path,
main_dag_name=parent_dag_name, dag=dag)
dag=dag)
# Decommission node from Kubernetes cluster using Promenade # Decommission node from Kubernetes cluster using Promenade
promenade_decommission_node = PromenadeDecommissionNodeOperator( promenade_decommission_node = PromenadeDecommissionNodeOperator(
task_id='promenade_decommission_node', task_id='promenade_decommission_node',
shipyard_conf=config_path, shipyard_conf=config_path,
main_dag_name=parent_dag_name, dag=dag)
dag=dag)
# Define dependencies # Define dependencies
promenade_clear_labels.set_upstream(promenade_drain_node) promenade_clear_labels.set_upstream(promenade_drain_node)
promenade_shutdown_kubelet.set_upstream(promenade_clear_labels) promenade_shutdown_kubelet.set_upstream(promenade_clear_labels)
promenade_check_etcd.set_upstream(promenade_shutdown_kubelet) promenade_check_etcd.set_upstream(promenade_shutdown_kubelet)
drydock_destroy_node.set_upstream(promenade_check_etcd) drydock_destroy_node.set_upstream(promenade_check_etcd)
promenade_decommission_node.set_upstream(drydock_destroy_node) promenade_decommission_node.set_upstream(drydock_destroy_node)
return dag return destroy_server

View File

@ -25,7 +25,7 @@ except ImportError:
def all_preflight_checks(dag): def all_preflight_checks(dag):
''' '''
Pre-Flight Checks Subdag Pre-Flight Checks TaskGroup
''' '''
with TaskGroup(group_id="preflight", dag=dag) as preflight: with TaskGroup(group_id="preflight", dag=dag) as preflight:
''' '''

View File

@ -86,7 +86,7 @@ def get_fake_latest_step_dict_failed():
:rtype: dict :rtype: dict
""" """
return { return {
'armada_build': {'state': 'failed'}, 'armada_build.armada_post_apply': {'state': 'failed'},
'arbitrary_step': {'state': 'success'}, 'arbitrary_step': {'state': 'success'},
'another_arbitrary_step': {'state': 'running'}, 'another_arbitrary_step': {'state': 'running'},
'upgrade_airflow': {'state': 'success'}, 'upgrade_airflow': {'state': 'success'},