From d5e21c4b184945b161d2a8b6e3f5f7e55e789823 Mon Sep 17 00:00:00 2001 From: Anthony Lin Date: Mon, 12 Feb 2018 07:05:11 +0000 Subject: [PATCH] Refactor Promenade Operator We will move away from the usage of if/else block and will instead make use of inheritance (promenade base operator) and task specific operators in our dags to execute the workflow Change-Id: Id32592d4f5f2d32e479344c7b859493b6bb450cb --- shipyard_airflow/dags/destroy_node.py | 50 ++--- .../plugins/promenade_base_operator.py | 126 +++++++++++ .../plugins/promenade_check_etcd.py | 52 +++++ .../plugins/promenade_clear_labels.py | 54 +++++ .../plugins/promenade_decommission_node.py | 54 +++++ .../plugins/promenade_drain_node.py | 56 +++++ .../plugins/promenade_operator.py | 204 ------------------ .../plugins/promenade_shutdown_kubelet.py | 54 +++++ 8 files changed, 422 insertions(+), 228 deletions(-) create mode 100644 shipyard_airflow/plugins/promenade_base_operator.py create mode 100644 shipyard_airflow/plugins/promenade_check_etcd.py create mode 100644 shipyard_airflow/plugins/promenade_clear_labels.py create mode 100644 shipyard_airflow/plugins/promenade_decommission_node.py create mode 100644 shipyard_airflow/plugins/promenade_drain_node.py delete mode 100644 shipyard_airflow/plugins/promenade_operator.py create mode 100644 shipyard_airflow/plugins/promenade_shutdown_kubelet.py diff --git a/shipyard_airflow/dags/destroy_node.py b/shipyard_airflow/dags/destroy_node.py index 0fefb4cf..4c8ade20 100644 --- a/shipyard_airflow/dags/destroy_node.py +++ b/shipyard_airflow/dags/destroy_node.py @@ -14,7 +14,11 @@ from airflow.models import DAG from airflow.operators import DryDockOperator -from airflow.operators import PromenadeOperator +from airflow.operators import PromenadeCheckEtcdOperator +from airflow.operators import PromenadeClearLabelsOperator +from airflow.operators import PromenadeDecommissionNodeOperator +from airflow.operators import PromenadeDrainNodeOperator +from airflow.operators import PromenadeShutdownKubeletOperator # Location of shiyard.conf @@ -24,45 +28,44 @@ config_path = '/usr/local/airflow/plugins/shipyard.conf' def destroy_server(parent_dag_name, child_dag_name, args): - ''' - Tear Down Node - ''' + """DAG to tear down node + + The DAG will make use of the promenade and drydock operators + to tear down a target node. + + """ dag = DAG( '{}.{}'.format(parent_dag_name, child_dag_name), default_args=args) # Drain Node - promenade_drain_node = PromenadeOperator( + promenade_drain_node = PromenadeDrainNodeOperator( task_id='promenade_drain_node', shipyard_conf=config_path, - action='promenade_drain_node', main_dag_name=parent_dag_name, sub_dag_name=child_dag_name, dag=dag) - # Remove Labels - promenade_remove_labels = PromenadeOperator( - task_id='promenade_remove_labels', + # Clear Labels + promenade_clear_labels = PromenadeClearLabelsOperator( + task_id='promenade_clear_labels', shipyard_conf=config_path, - action='promenade_remove_labels', main_dag_name=parent_dag_name, sub_dag_name=child_dag_name, dag=dag) - # Stop Kubelet - promenade_stop_kubelet = PromenadeOperator( - task_id='promenade_stop_kubelet', + # Shutdown Kubelet + promenade_shutdown_kubelet = PromenadeShutdownKubeletOperator( + task_id='promenade_shutdown_kubelet', shipyard_conf=config_path, - action='promenade_stop_kubelet', main_dag_name=parent_dag_name, sub_dag_name=child_dag_name, dag=dag) # ETCD Sanity Check - promenade_check_etcd = PromenadeOperator( + promenade_check_etcd = PromenadeCheckEtcdOperator( task_id='promenade_check_etcd', shipyard_conf=config_path, - action='promenade_check_etcd', main_dag_name=parent_dag_name, sub_dag_name=child_dag_name, dag=dag) @@ -76,20 +79,19 @@ def destroy_server(parent_dag_name, child_dag_name, args): sub_dag_name=child_dag_name, dag=dag) - # Delete node from cluster using Promenade - promenade_delete_node = PromenadeOperator( - task_id='promenade_delete_node', + # Decommission node from Kubernetes cluster using Promenade + promenade_decommission_node = PromenadeDecommissionNodeOperator( + task_id='promenade_decommission_node', shipyard_conf=config_path, - action='promenade_delete_node', main_dag_name=parent_dag_name, sub_dag_name=child_dag_name, dag=dag) # Define dependencies - promenade_remove_labels.set_upstream(promenade_drain_node) - promenade_stop_kubelet.set_upstream(promenade_remove_labels) - promenade_check_etcd.set_upstream(promenade_stop_kubelet) + promenade_clear_labels.set_upstream(promenade_drain_node) + promenade_shutdown_kubelet.set_upstream(promenade_clear_labels) + promenade_check_etcd.set_upstream(promenade_shutdown_kubelet) drydock_destroy_node.set_upstream(promenade_check_etcd) - promenade_delete_node.set_upstream(drydock_destroy_node) + promenade_decommission_node.set_upstream(drydock_destroy_node) return dag diff --git a/shipyard_airflow/plugins/promenade_base_operator.py b/shipyard_airflow/plugins/promenade_base_operator.py new file mode 100644 index 00000000..39637a0e --- /dev/null +++ b/shipyard_airflow/plugins/promenade_base_operator.py @@ -0,0 +1,126 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.plugins_manager import AirflowPlugin +from airflow.exceptions import AirflowException + +from service_endpoint import ucp_service_endpoint +from service_token import shipyard_service_token + + +class PromenadeBaseOperator(BaseOperator): + + """Promenade Base Operator + + All promenade related workflow operators will use the promenade + base operator as the parent and inherit attributes and methods + from this class + + """ + + @apply_defaults + def __init__(self, + main_dag_name=None, + promenade_svc_endpoint=None, + promenade_svc_type='kubernetesprovisioner', + redeploy_server=None, + shipyard_conf=None, + sub_dag_name=None, + svc_token=None, + workflow_info={}, + xcom_push=True, + *args, **kwargs): + """Initialization of PromenadeBaseOperator object. + + :param main_dag_name: Parent Dag + :param promenade_svc_endpoint: Promenade Service Endpoint + :param promenade_svc_type: Promenade Service Type + :param redeploy_server: Server to be redeployed + :param shipyard_conf: Path of shipyard.conf + :param sub_dag_name: Child Dag + :param svc_token: Keystone Token + :param workflow_info: Information related to current workflow + :param xcom_push: xcom usage + + """ + + super(PromenadeBaseOperator, self).__init__(*args, + **kwargs) + self.main_dag_name = main_dag_name + self.promenade_svc_endpoint = promenade_svc_endpoint + self.promenade_svc_type = promenade_svc_type + self.redeploy_server = redeploy_server + self.shipyard_conf = shipyard_conf + self.sub_dag_name = sub_dag_name + self.svc_token = svc_token + self.workflow_info = workflow_info + self.xcom_push_flag = xcom_push + + def execute(self, context): + + # Execute promenade base function + self.promenade_base(context) + + # Exeute child function + self.do_execute() + + @shipyard_service_token + def promenade_base(self, context): + # Define task_instance + task_instance = context['task_instance'] + + # Extract information related to current workflow + # The workflow_info variable will be a dictionary + # that contains information about the workflow such + # as action_id, name and other related parameters + self.workflow_info = task_instance.xcom_pull( + task_ids='action_xcom', key='action', + dag_id=self.main_dag_name) + + # Logs uuid of Shipyard action + logging.info("Executing Shipyard Action %s", + self.workflow_info['id']) + + # Retrieve information of the server that we want to redeploy + # if user executes the 'redeploy_server' dag + if self.workflow_info['dag_id'] == 'redeploy_server': + self.redeploy_server = self.workflow_info['parameters'].get( + 'server-name') + + if self.redeploy_server: + logging.info("Server to be redeployed is %s", + self.redeploy_server) + else: + raise AirflowException('%s was unable to retrieve the ' + 'server to be redeployed.' + % self.__class__.__name__) + + # Retrieve promenade endpoint + self.promenade_svc_endpoint = ucp_service_endpoint( + self, svc_type=self.promenade_svc_type) + + logging.info("Promenade endpoint is %s", + self.promenade_svc_endpoint) + + +class PromenadeBaseOperatorPlugin(AirflowPlugin): + + """Creates PromenadeBaseOperator in Airflow.""" + + name = 'promenade_base_operator_plugin' + operators = [PromenadeBaseOperator] diff --git a/shipyard_airflow/plugins/promenade_check_etcd.py b/shipyard_airflow/plugins/promenade_check_etcd.py new file mode 100644 index 00000000..d4f5e200 --- /dev/null +++ b/shipyard_airflow/plugins/promenade_check_etcd.py @@ -0,0 +1,52 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time + +from airflow.plugins_manager import AirflowPlugin +from airflow.exceptions import AirflowException + +from promenade_base_operator import PromenadeBaseOperator + + +class PromenadeCheckEtcdOperator(PromenadeBaseOperator): + + """Promenade Check ETCD Operator + + This operator will trigger promenade to retrieve the current + state of etcd. + + """ + + def do_execute(self): + # Placeholder function. Updates will be made when the Promenade + # API is ready for consumption. + logging.info("Performing health check on etcd...") + time.sleep(5) + + check_etcd = True + + if check_etcd: + logging.info("The etcd cluster is healthy and ready") + else: + raise AirflowException('Please check the state of etcd!') + + +class PromenadeCheckEtcdOperatorPlugin(AirflowPlugin): + + """Creates PromenadeCheckEtcdOperator in Airflow.""" + + name = 'promenade_check_etcd_operator' + operators = [PromenadeCheckEtcdOperator] diff --git a/shipyard_airflow/plugins/promenade_clear_labels.py b/shipyard_airflow/plugins/promenade_clear_labels.py new file mode 100644 index 00000000..526f8b78 --- /dev/null +++ b/shipyard_airflow/plugins/promenade_clear_labels.py @@ -0,0 +1,54 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time + +from airflow.plugins_manager import AirflowPlugin +from airflow.exceptions import AirflowException + +from promenade_base_operator import PromenadeBaseOperator + + +class PromenadeClearLabelsOperator(PromenadeBaseOperator): + + """Promenade Clear Labels Operator + + This operator will trigger promenade to clear the labels on + the target node + + """ + + def do_execute(self): + # Placeholder function. Updates will be made when the Promenade + # API is ready for consumption. + logging.info("Removing labels on node...") + time.sleep(5) + + labels_removed = True + + if labels_removed: + logging.info("Successfully removed labels on %s", + self.redeploy_server) + else: + raise AirflowException('Failed to remove labels on %s!', + self.redeploy_server) + + +class PromenadeClearLabelsOperatorPlugin(AirflowPlugin): + + """Creates PromenadeClearLabelsOperator in Airflow.""" + + name = 'promenade_clear_labels_operator' + operators = [PromenadeClearLabelsOperator] diff --git a/shipyard_airflow/plugins/promenade_decommission_node.py b/shipyard_airflow/plugins/promenade_decommission_node.py new file mode 100644 index 00000000..74664b31 --- /dev/null +++ b/shipyard_airflow/plugins/promenade_decommission_node.py @@ -0,0 +1,54 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time + +from airflow.plugins_manager import AirflowPlugin +from airflow.exceptions import AirflowException + +from promenade_base_operator import PromenadeBaseOperator + + +class PromenadeDecommissionNodeOperator(PromenadeBaseOperator): + + """Promenade Decommission Node Operator + + This operator will trigger promenade to perform steps to + clean up the target node from the Kubernetes cluster + + """ + + def do_execute(self): + # Placeholder function. Updates will be made when the Promenade + # API is ready for consumption. + logging.info("Decommissioning node from Kubernetes cluster...") + time.sleep(5) + + decommission_node = True + + if decommission_node: + logging.info("Succesfully decommissioned node %s", + self.redeploy_server) + else: + raise AirflowException('Failed to decommission node %s!', + self.redeploy_server) + + +class PromenadeDecommissionNodeOperatorPlugin(AirflowPlugin): + + """Creates PromenadeDecommissionNodeOperator in Airflow.""" + + name = 'promenade_decommission_node_operator' + operators = [PromenadeDecommissionNodeOperator] diff --git a/shipyard_airflow/plugins/promenade_drain_node.py b/shipyard_airflow/plugins/promenade_drain_node.py new file mode 100644 index 00000000..7fea0c28 --- /dev/null +++ b/shipyard_airflow/plugins/promenade_drain_node.py @@ -0,0 +1,56 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time + +from airflow.plugins_manager import AirflowPlugin +from airflow.exceptions import AirflowException + +from promenade_base_operator import PromenadeBaseOperator + + +class PromenadeDrainNodeOperator(PromenadeBaseOperator): + + """Promenade Drain Node Operator + + This operator will trigger promenade to drain the target + node and ensure that the node is no longer the target of + any pod scheduling. Promenade will evicts or deletes any + running pod on the node. + + """ + + def do_execute(self): + # Placeholder function. Updates will be made when the Promenade + # API is ready for consumption. + logging.info("Draining node...") + time.sleep(5) + + node_drained = True + + if node_drained: + logging.info("Node %s has been successfully drained", + self.redeploy_server) + else: + raise AirflowException('Failed to drain %s!', + self.redeploy_server) + + +class PromenadeDrainNodeOperatorPlugin(AirflowPlugin): + + """Creates PromenadeDrainNodeOperator in Airflow.""" + + name = 'promenade_drain_node_operator' + operators = [PromenadeDrainNodeOperator] diff --git a/shipyard_airflow/plugins/promenade_operator.py b/shipyard_airflow/plugins/promenade_operator.py deleted file mode 100644 index 3c13048f..00000000 --- a/shipyard_airflow/plugins/promenade_operator.py +++ /dev/null @@ -1,204 +0,0 @@ -# Copyright 2018 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import time - -from airflow.models import BaseOperator -from airflow.utils.decorators import apply_defaults -from airflow.plugins_manager import AirflowPlugin -from airflow.exceptions import AirflowException - -from service_endpoint import ucp_service_endpoint -from service_token import shipyard_service_token - - -class PromenadeOperator(BaseOperator): - """ - Supports interaction with Promenade - :param action: Task to perform - :param main_dag_name: Parent Dag - :param shipyard_conf: Location of shipyard.conf - :param sub_dag_name: Child Dag - """ - - @apply_defaults - def __init__(self, - action=None, - main_dag_name=None, - shipyard_conf=None, - sub_dag_name=None, - svc_token=None, - workflow_info={}, - xcom_push=True, - *args, **kwargs): - - super(PromenadeOperator, self).__init__(*args, **kwargs) - self.action = action - self.main_dag_name = main_dag_name - self.shipyard_conf = shipyard_conf - self.sub_dag_name = sub_dag_name - self.svc_token = svc_token - self.workflow_info = workflow_info - self.xcom_push_flag = xcom_push - - def execute(self, context): - # Initialize Variables - check_etcd = False - delete_node = False - labels_removed = False - node_drained = False - redeploy_server = None - stop_kubelet = False - - # Define task_instance - task_instance = context['task_instance'] - - # Extract information related to current workflow - # The workflow_info variable will be a dictionary - # that contains information about the workflow such - # as action_id, name and other related parameters - workflow_info = task_instance.xcom_pull( - task_ids='action_xcom', key='action', - dag_id=self.main_dag_name) - - # Logs uuid of action performed by the Operator - logging.info("Promenade Operator for action %s", workflow_info['id']) - - # Retrieve information of the server that we want to redeploy if user - # executes the 'redeploy_server' dag - if workflow_info['dag_id'] == 'redeploy_server': - redeploy_server = workflow_info['parameters'].get('server-name') - - if redeploy_server: - logging.info("Server to be redeployed is %s", redeploy_server) - else: - raise AirflowException('Unable to retrieve information of ' - 'node to be redeployed!') - - # Retrieve Endpoint Information - svc_type = 'kubernetesprovisioner' - promenade_svc_endpoint = ucp_service_endpoint(self, - svc_type=svc_type) - - logging.info("Promenade endpoint is %s", promenade_svc_endpoint) - - # Promenade API Call - # Drain node using Promenade - if self.action == 'promenade_drain_node': - node_drained = self.promenade_drain_node(redeploy_server) - - if node_drained: - logging.info("Node %s has been successfully drained", - redeploy_server) - else: - raise AirflowException('Failed to drain %s!', - redeploy_server) - - # Remove labels using Promenade - elif self.action == 'promenade_remove_labels': - labels_removed = self.promenade_remove_labels(redeploy_server) - - if labels_removed: - logging.info("Successfully removed labels on %s", - redeploy_server) - else: - raise AirflowException('Failed to remove labels on %s!', - redeploy_server) - - # Stops kubelet on node using Promenade - elif self.action == 'promenade_stop_kubelet': - stop_kubelet = self.promenade_stop_kubelet(redeploy_server) - - if stop_kubelet: - logging.info("Successfully stopped kubelet on %s", - redeploy_server) - else: - raise AirflowException('Failed to stopped kubelet on %s!', - redeploy_server) - - # Performs etcd sanity check using Promenade - elif self.action == 'promenade_check_etcd': - check_etcd = self.promenade_check_etcd() - - if check_etcd: - logging.info("The etcd cluster is healthy and ready") - else: - raise AirflowException('Please check the state of etcd!') - - # Delete node from cluster using Promenade - elif self.action == 'promenade_delete_node': - delete_node = self.promenade_delete_node(redeploy_server) - - if delete_node: - logging.info("Succesfully deleted node %s from cluster", - redeploy_server) - else: - raise AirflowException('Failed to node %s from cluster!', - redeploy_server) - - # No action to perform - else: - logging.info('No Action to Perform') - - @shipyard_service_token - def promenade_drain_node(self, redeploy_server): - # Placeholder function. Updates will be made when the Promenade - # API is ready for consumption. - logging.info("The token is %s", self.svc_token) - logging.info("Draining node...") - time.sleep(15) - - return True - - @shipyard_service_token - def promenade_remove_labels(self, redeploy_server): - # Placeholder function. Updates will be made when the Promenade - # API is ready for consumption. - logging.info("Removing labels on node...") - time.sleep(15) - - return True - - @shipyard_service_token - def promenade_stop_kubelet(self, redeploy_server): - # Placeholder function. Updates will be made when the Promenade - # API is ready for consumption. - logging.info("Stopping kubelet on node...") - time.sleep(15) - - return True - - @shipyard_service_token - def promenade_check_etcd(self): - # Placeholder function. Updates will be made when the Promenade - # API is ready for consumption. - logging.info("Performing health check on etcd...") - time.sleep(15) - - return True - - @shipyard_service_token - def promenade_delete_node(self, redeploy_server): - # Placeholder function. Updates will be made when the Promenade - # API is ready for consumption. - logging.info("Deleting node from cluster...") - time.sleep(15) - - return True - - -class PromenadeOperatorPlugin(AirflowPlugin): - name = 'promenade_operator_plugin' - operators = [PromenadeOperator] diff --git a/shipyard_airflow/plugins/promenade_shutdown_kubelet.py b/shipyard_airflow/plugins/promenade_shutdown_kubelet.py new file mode 100644 index 00000000..774c731c --- /dev/null +++ b/shipyard_airflow/plugins/promenade_shutdown_kubelet.py @@ -0,0 +1,54 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time + +from airflow.plugins_manager import AirflowPlugin +from airflow.exceptions import AirflowException + +from promenade_base_operator import PromenadeBaseOperator + + +class PromenadeShutdownKubeletOperator(PromenadeBaseOperator): + + """Promenade Shutdown Kubelet Operator + + This operator will trigger promenade to shut down kubelet + on the target node. + + """ + + def do_execute(self): + # Placeholder function. Updates will be made when the Promenade + # API is ready for consumption. + logging.info("Shutting down kubelet on node...") + time.sleep(5) + + shutdown_kubelet = True + + if shutdown_kubelet: + logging.info("Successfully shut down kubelet on %s", + self.redeploy_server) + else: + raise AirflowException('Failed to shut down kubelet on %s!', + self.redeploy_server) + + +class PromenadeShutdownKubeletOperatorPlugin(AirflowPlugin): + + """Creates PromenadeShutdownKubeletOperator in Airflow.""" + + name = 'promenade_shutdown_kubelet_operator' + operators = [PromenadeShutdownKubeletOperator]