diff --git a/charts/shipyard/values.yaml b/charts/shipyard/values.yaml index 15a7698f..84bfb998 100644 --- a/charts/shipyard/values.yaml +++ b/charts/shipyard/values.yaml @@ -328,6 +328,8 @@ conf: prepare_node_task_timeout: 1800 deploy_node_query_interval: 30 deploy_node_task_timeout: 3600 + destroy_node_query_interval: 30 + destroy_node_task_timeout: 900 cluster_join_check_backoff_time: 120 keystone_authtoken: delay_auth_decision: true diff --git a/etc/shipyard/shipyard.conf.sample b/etc/shipyard/shipyard.conf.sample index dc17fe6f..2d3097f5 100644 --- a/etc/shipyard/shipyard.conf.sample +++ b/etc/shipyard/shipyard.conf.sample @@ -82,6 +82,12 @@ # Time out (in seconds) for deploy_node task (integer value) #deploy_node_task_timeout = 3600 +# Query interval (in seconds) for destroy_node task (integer value) +#destroy_node_query_interval = 30 + +# Time out (in seconds) for destroy_node task (integer value) +#destroy_node_task_timeout = 900 + # Backoff time (in seconds) before checking cluster join (integer value) #cluster_join_check_backoff_time = 120 diff --git a/shipyard_airflow/conf/config.py b/shipyard_airflow/conf/config.py index afc42375..9b2cbbd1 100644 --- a/shipyard_airflow/conf/config.py +++ b/shipyard_airflow/conf/config.py @@ -170,6 +170,16 @@ SECTIONS = [ default=3600, help='Time out (in seconds) for deploy_node task' ), + cfg.IntOpt( + 'destroy_node_query_interval', + default=30, + help='Query interval (in seconds) for destroy_node task' + ), + cfg.IntOpt( + 'destroy_node_task_timeout', + default=900, + help='Time out (in seconds) for destroy_node task' + ), cfg.IntOpt( 'cluster_join_check_backoff_time', default=120, diff --git a/shipyard_airflow/dags/destroy_node.py b/shipyard_airflow/dags/destroy_node.py new file mode 100644 index 00000000..0fefb4cf --- /dev/null +++ b/shipyard_airflow/dags/destroy_node.py @@ -0,0 +1,95 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow.models import DAG +from airflow.operators import DryDockOperator +from airflow.operators import PromenadeOperator + + +# Location of shiyard.conf +# Note that the shipyard.conf file needs to be placed on a volume +# that can be accessed by the containers +config_path = '/usr/local/airflow/plugins/shipyard.conf' + + +def destroy_server(parent_dag_name, child_dag_name, args): + ''' + Tear Down Node + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args) + + # Drain Node + promenade_drain_node = PromenadeOperator( + task_id='promenade_drain_node', + shipyard_conf=config_path, + action='promenade_drain_node', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, + dag=dag) + + # Remove Labels + promenade_remove_labels = PromenadeOperator( + task_id='promenade_remove_labels', + shipyard_conf=config_path, + action='promenade_remove_labels', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, + dag=dag) + + # Stop Kubelet + promenade_stop_kubelet = PromenadeOperator( + task_id='promenade_stop_kubelet', + shipyard_conf=config_path, + action='promenade_stop_kubelet', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, + dag=dag) + + # ETCD Sanity Check + promenade_check_etcd = PromenadeOperator( + task_id='promenade_check_etcd', + shipyard_conf=config_path, + action='promenade_check_etcd', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, + dag=dag) + + # Power down and destroy node using DryDock + drydock_destroy_node = DryDockOperator( + task_id='destroy_node', + shipyard_conf=config_path, + action='destroy_node', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, + dag=dag) + + # Delete node from cluster using Promenade + promenade_delete_node = PromenadeOperator( + task_id='promenade_delete_node', + shipyard_conf=config_path, + action='promenade_delete_node', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, + dag=dag) + + # Define dependencies + promenade_remove_labels.set_upstream(promenade_drain_node) + promenade_stop_kubelet.set_upstream(promenade_remove_labels) + promenade_check_etcd.set_upstream(promenade_stop_kubelet) + drydock_destroy_node.set_upstream(promenade_check_etcd) + promenade_delete_node.set_upstream(drydock_destroy_node) + + return dag diff --git a/shipyard_airflow/dags/redeploy_server.py b/shipyard_airflow/dags/redeploy_server.py index a59ff191..5d0d25aa 100644 --- a/shipyard_airflow/dags/redeploy_server.py +++ b/shipyard_airflow/dags/redeploy_server.py @@ -14,24 +14,28 @@ from datetime import timedelta import airflow -from airflow import DAG import failure_handlers +from airflow import DAG +from airflow.operators import ConcurrencyCheckOperator +from airflow.operators.python_operator import PythonOperator +from airflow.operators.subdag_operator import SubDagOperator + +from deckhand_get_design import get_design_deckhand +from destroy_node import destroy_server +from drydock_deploy_site import deploy_site_drydock from preflight_checks import all_preflight_checks from validate_site_design import validate_site_design -from airflow.operators.subdag_operator import SubDagOperator -from airflow.operators import ConcurrencyCheckOperator -from airflow.operators import DeckhandOperator -from airflow.operators import PlaceholderOperator -from airflow.operators.python_operator import PythonOperator """ redeploy_server is the top-level orchestration DAG for redeploying a server using the Undercloud platform. """ -PARENT_DAG_NAME = 'redeploy_server' -DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check' ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight' +DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check' DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version' +DESTROY_SERVER_DAG_NAME = 'destroy_server' +DRYDOCK_BUILD_DAG_NAME = 'drydock_build' +PARENT_DAG_NAME = 'redeploy_server' VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design' default_args = { @@ -43,7 +47,7 @@ default_args = { 'email_on_retry': False, 'provide_context': True, 'retries': 0, - 'retry_delay': timedelta(minutes=1), + 'retry_delay': timedelta(seconds=30), } dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None) @@ -73,9 +77,11 @@ preflight = SubDagOperator( PARENT_DAG_NAME, ALL_PREFLIGHT_CHECKS_DAG_NAME, args=default_args), task_id=ALL_PREFLIGHT_CHECKS_DAG_NAME, on_failure_callback=failure_handlers.step_failure_handler, - dag=dag, ) + dag=dag) -get_design_version = DeckhandOperator( +get_design_version = SubDagOperator( + subdag=get_design_deckhand( + PARENT_DAG_NAME, DECKHAND_GET_DESIGN_VERSION, args=default_args), task_id=DECKHAND_GET_DESIGN_VERSION, on_failure_callback=failure_handlers.step_failure_handler, dag=dag) @@ -87,23 +93,17 @@ validate_site_design = SubDagOperator( on_failure_callback=failure_handlers.step_failure_handler, dag=dag) -site_evacuation = PlaceholderOperator( - task_id='site_evacuation', +destroy_server = SubDagOperator( + subdag=destroy_server( + PARENT_DAG_NAME, DESTROY_SERVER_DAG_NAME, args=default_args), + task_id=DESTROY_SERVER_DAG_NAME, on_failure_callback=failure_handlers.step_failure_handler, dag=dag) -drydock_rebuild = PlaceholderOperator( - task_id='drydock_rebuild', - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -query_node_status = PlaceholderOperator( - task_id='redeployed_node_status', - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -armada_rebuild = PlaceholderOperator( - task_id='armada_rebuild', +drydock_build = SubDagOperator( + subdag=deploy_site_drydock( + PARENT_DAG_NAME, DRYDOCK_BUILD_DAG_NAME, args=default_args), + task_id=DRYDOCK_BUILD_DAG_NAME, on_failure_callback=failure_handlers.step_failure_handler, dag=dag) @@ -112,7 +112,5 @@ concurrency_check.set_upstream(action_xcom) preflight.set_upstream(concurrency_check) get_design_version.set_upstream(preflight) validate_site_design.set_upstream(get_design_version) -site_evacuation.set_upstream(validate_site_design) -drydock_rebuild.set_upstream(site_evacuation) -query_node_status.set_upstream(drydock_rebuild) -armada_rebuild.set_upstream(query_node_status) +destroy_server.set_upstream(validate_site_design) +drydock_build.set_upstream(destroy_server) diff --git a/shipyard_airflow/plugins/deckhand_operator.py b/shipyard_airflow/plugins/deckhand_operator.py index 377ea1c4..e262c8ba 100644 --- a/shipyard_airflow/plugins/deckhand_operator.py +++ b/shipyard_airflow/plugins/deckhand_operator.py @@ -56,6 +56,7 @@ class DeckhandOperator(BaseOperator): def execute(self, context): # Initialize Variables deckhand_design_version = None + redeploy_server = None # Define task_instance task_instance = context['task_instance'] @@ -71,6 +72,17 @@ class DeckhandOperator(BaseOperator): # Logs uuid of action performed by the Operator logging.info("DeckHand Operator for action %s", workflow_info['id']) + # Retrieve information of the server that we want to redeploy if user + # executes the 'redeploy_server' dag + if workflow_info['dag_id'] == 'redeploy_server': + redeploy_server = workflow_info['parameters'].get('server-name') + + if redeploy_server: + logging.info("Server to be redeployed is %s", redeploy_server) + else: + raise AirflowException('Unable to retrieve information of ' + 'node to be redeployed!') + # Retrieve Endpoint Information svc_type = 'deckhand' context['svc_endpoint'] = ucp_service_endpoint(self, diff --git a/shipyard_airflow/plugins/drydock_operators.py b/shipyard_airflow/plugins/drydock_operators.py index d71983ca..ca2ed582 100644 --- a/shipyard_airflow/plugins/drydock_operators.py +++ b/shipyard_airflow/plugins/drydock_operators.py @@ -68,6 +68,8 @@ class DryDockOperator(BaseOperator): self.xcom_push_flag = xcom_push def execute(self, context): + # Initialize Variable + redeploy_server = None # Placeholder definition # TODO: Need to decide how to pass the required value from Shipyard to @@ -88,6 +90,19 @@ class DryDockOperator(BaseOperator): # Logs uuid of action performed by the Operator logging.info("DryDock Operator for action %s", workflow_info['id']) + # Retrieve information of the server that we want to redeploy if user + # executes the 'redeploy_server' dag + # Set node filter to be the server that we want to redeploy + if workflow_info['dag_id'] == 'redeploy_server': + redeploy_server = workflow_info['parameters'].get('server-name') + + if redeploy_server: + logging.info("Server to be redeployed is %s", redeploy_server) + self.node_filter = redeploy_server + else: + raise AirflowException('Unable to retrieve information of ' + 'node to be redeployed!') + # Retrieve Deckhand Design Reference self.design_ref = self.get_deckhand_design_ref(context) @@ -188,6 +203,26 @@ class DryDockOperator(BaseOperator): # polling interval to 30 seconds. check_node_status(1800, 30) + # Create Task for destroy_node + # NOTE: This is a PlaceHolder function. The 'destroy_node' + # functionalities in DryDock is being worked on and is not + # ready at the moment. + elif self.action == 'destroy_node': + # Default settings for 'destroy_node' execution is to query + # the task every 30 seconds and to time out after 900 seconds + query_interval = config.get('drydock', + 'destroy_node_query_interval') + task_timeout = config.get('drydock', 'destroy_node_task_timeout') + + logging.info("Destroying node %s from cluster...", redeploy_server) + time.sleep(30) + logging.info("Successfully deleted node %s", redeploy_server) + + # TODO: Uncomment when the function to destroy/delete node is + # ready for consumption in Drydock + # self.drydock_action(drydock_client, context, self.action, + # query_interval, task_timeout) + # Do not perform any action else: logging.info('No Action to Perform') @@ -235,7 +270,7 @@ class DryDockOperator(BaseOperator): # Trigger DryDock to execute task and retrieve task ID task_id = self.drydock_perform_task(drydock_client, context, - action, None) + action, self.node_filter) logging.info('Task ID is %s', task_id) diff --git a/shipyard_airflow/plugins/promenade_operator.py b/shipyard_airflow/plugins/promenade_operator.py new file mode 100644 index 00000000..f9c46a2b --- /dev/null +++ b/shipyard_airflow/plugins/promenade_operator.py @@ -0,0 +1,201 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time + +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.plugins_manager import AirflowPlugin +from airflow.exceptions import AirflowException + +from service_endpoint import ucp_service_endpoint +from service_token import shipyard_service_token + + +class PromenadeOperator(BaseOperator): + """ + Supports interaction with Promenade + :param action: Task to perform + :param main_dag_name: Parent Dag + :param shipyard_conf: Location of shipyard.conf + :param sub_dag_name: Child Dag + """ + + @apply_defaults + def __init__(self, + action=None, + main_dag_name=None, + shipyard_conf=None, + sub_dag_name=None, + workflow_info={}, + xcom_push=True, + *args, **kwargs): + + super(PromenadeOperator, self).__init__(*args, **kwargs) + self.action = action + self.main_dag_name = main_dag_name + self.shipyard_conf = shipyard_conf + self.sub_dag_name = sub_dag_name + self.workflow_info = workflow_info + self.xcom_push_flag = xcom_push + + def execute(self, context): + # Initialize Variables + check_etcd = False + delete_node = False + labels_removed = False + node_drained = False + redeploy_server = None + stop_kubelet = False + + # Define task_instance + task_instance = context['task_instance'] + + # Extract information related to current workflow + # The workflow_info variable will be a dictionary + # that contains information about the workflow such + # as action_id, name and other related parameters + workflow_info = task_instance.xcom_pull( + task_ids='action_xcom', key='action', + dag_id=self.main_dag_name) + + # Logs uuid of action performed by the Operator + logging.info("Promenade Operator for action %s", workflow_info['id']) + + # Retrieve information of the server that we want to redeploy if user + # executes the 'redeploy_server' dag + if workflow_info['dag_id'] == 'redeploy_server': + redeploy_server = workflow_info['parameters'].get('server-name') + + if redeploy_server: + logging.info("Server to be redeployed is %s", redeploy_server) + else: + raise AirflowException('Unable to retrieve information of ' + 'node to be redeployed!') + + # Retrieve Endpoint Information + svc_type = 'kubernetesprovisioner' + context['svc_endpoint'] = ucp_service_endpoint(self, + svc_type=svc_type) + logging.info("Promenade endpoint is %s", context['svc_endpoint']) + + # Promenade API Call + # Drain node using Promenade + if self.action == 'promenade_drain_node': + node_drained = self.promenade_drain_node(context, + redeploy_server) + + if node_drained: + logging.info("Node %s has been successfully drained", + redeploy_server) + else: + raise AirflowException('Failed to drain %s!', + redeploy_server) + + # Remove labels using Promenade + elif self.action == 'promenade_remove_labels': + labels_removed = self.promenade_drain_node(context, + redeploy_server) + + if labels_removed: + logging.info("Successfully removed labels on %s", + redeploy_server) + else: + raise AirflowException('Failed to remove labels on %s!', + redeploy_server) + + # Stops kubelet on node using Promenade + elif self.action == 'promenade_stop_kubelet': + stop_kubelet = self.promenade_stop_kubelet(context, + redeploy_server) + + if stop_kubelet: + logging.info("Successfully stopped kubelet on %s", + redeploy_server) + else: + raise AirflowException('Failed to stopped kubelet on %s!', + redeploy_server) + + # Performs etcd sanity check using Promenade + elif self.action == 'promenade_check_etcd': + check_etcd = self.promenade_check_etcd(context) + + if check_etcd: + logging.info("The etcd cluster is healthy and ready") + else: + raise AirflowException('Please check the state of etcd!') + + # Delete node from cluster using Promenade + elif self.action == 'promenade_delete_node': + delete_node = self.promenade_delete_node(context, + redeploy_server) + + if delete_node: + logging.info("Succesfully deleted node %s from cluster", + redeploy_server) + else: + raise AirflowException('Failed to node %s from cluster!', + redeploy_server) + + # No action to perform + else: + logging.info('No Action to Perform') + + @shipyard_service_token + def promenade_drain_node(self, context, redeploy_server): + # Placeholder function. Updates will be made when the Promenade + # API is ready for consumption. + logging.info("Draining node...") + + return True + + @shipyard_service_token + def promenade_remove_labels(self, context, redeploy_server): + # Placeholder function. Updates will be made when the Promenade + # API is ready for consumption. + logging.info("Removing labels on node...") + + return True + + @shipyard_service_token + def promenade_stop_kubelet(self, context, redeploy_server): + # Placeholder function. Updates will be made when the Promenade + # API is ready for consumption. + logging.info("Stopping kubelet on node...") + + return True + + @shipyard_service_token + def promenade_check_etcd(self, context): + # Placeholder function. Updates will be made when the Promenade + # API is ready for consumption. + logging.info("Performing health check on etcd...") + + return True + + @shipyard_service_token + def promenade_delete_node(self, context, redeploy_server): + # Placeholder function. Updates will be made when the Promenade + # API is ready for consumption. + logging.info("Deleting node from cluster...") + time.sleep(30) + logging.info("Successfully deleted node %s", redeploy_server) + + return True + + +class PromenadeOperatorPlugin(AirflowPlugin): + name = 'promenade_operator_plugin' + operators = [PromenadeOperator] diff --git a/tests/unit/control/test.conf b/tests/unit/control/test.conf index fd1eb072..f689a6f2 100644 --- a/tests/unit/control/test.conf +++ b/tests/unit/control/test.conf @@ -11,6 +11,8 @@ service_type = deckhand cluster_join_check_backoff_time = 120 deploy_node_query_interval = 30 deploy_node_task_timeout = 3600 +destroy_node_query_interval = 30 +destroy_node_task_timeout = 900 prepare_node_query_interval = 30 prepare_node_task_timeout = 1800 prepare_site_query_interval = 10 diff --git a/tools/resources/shipyard.conf b/tools/resources/shipyard.conf index 29121f9d..779a2444 100644 --- a/tools/resources/shipyard.conf +++ b/tools/resources/shipyard.conf @@ -13,6 +13,8 @@ service_type = deckhand cluster_join_check_backoff_time = 120 deploy_node_query_interval = 30 deploy_node_task_timeout = 3600 +destroy_node_query_interval = 30 +destroy_node_task_timeout = 900 prepare_node_query_interval = 30 prepare_node_task_timeout = 1800 prepare_site_query_interval = 10