From f7d02238c3a5c53784bf45a5fce5af4d9ede45c6 Mon Sep 17 00:00:00 2001 From: Anthony Lin Date: Fri, 16 Mar 2018 11:06:44 +0000 Subject: [PATCH] Refactor Drydock Operator We will move away from the usage of if/else block and will instead make use of inheritance (drydock base operator) and task specific operators in our dags to execute the workflow Change-Id: Ifea530ad8a8a2a591b511be4f037d7b4b9dd6c6f --- shipyard_airflow/dags/destroy_node.py | 5 +- shipyard_airflow/dags/drydock_deploy_site.py | 19 +- shipyard_airflow/dags/validate_site_design.py | 8 +- .../plugins/drydock_base_operator.py | 289 ++++++++++++ .../plugins/drydock_deploy_nodes.py | 69 +++ .../plugins/drydock_destroy_nodes.py | 52 +++ shipyard_airflow/plugins/drydock_operators.py | 436 ------------------ .../plugins/drydock_prepare_nodes.py | 47 ++ .../plugins/drydock_prepare_site.py | 47 ++ .../plugins/drydock_validate_design.py | 85 ++++ .../plugins/drydock_verify_site.py | 46 ++ 11 files changed, 650 insertions(+), 453 deletions(-) create mode 100644 shipyard_airflow/plugins/drydock_base_operator.py create mode 100644 shipyard_airflow/plugins/drydock_deploy_nodes.py create mode 100644 shipyard_airflow/plugins/drydock_destroy_nodes.py delete mode 100644 shipyard_airflow/plugins/drydock_operators.py create mode 100644 shipyard_airflow/plugins/drydock_prepare_nodes.py create mode 100644 shipyard_airflow/plugins/drydock_prepare_site.py create mode 100644 shipyard_airflow/plugins/drydock_validate_design.py create mode 100644 shipyard_airflow/plugins/drydock_verify_site.py diff --git a/shipyard_airflow/dags/destroy_node.py b/shipyard_airflow/dags/destroy_node.py index ea7f5a0e..33365753 100644 --- a/shipyard_airflow/dags/destroy_node.py +++ b/shipyard_airflow/dags/destroy_node.py @@ -13,7 +13,7 @@ # limitations under the License. from airflow.models import DAG -from airflow.operators import DryDockOperator +from airflow.operators import DrydockDestroyNodeOperator from airflow.operators import PromenadeCheckEtcdOperator from airflow.operators import PromenadeClearLabelsOperator from airflow.operators import PromenadeDecommissionNodeOperator @@ -67,10 +67,9 @@ def destroy_server(parent_dag_name, child_dag_name, args): dag=dag) # Power down and destroy node using DryDock - drydock_destroy_node = DryDockOperator( + drydock_destroy_node = DrydockDestroyNodeOperator( task_id='destroy_node', shipyard_conf=config_path, - action='destroy_node', main_dag_name=parent_dag_name, sub_dag_name=child_dag_name, dag=dag) diff --git a/shipyard_airflow/dags/drydock_deploy_site.py b/shipyard_airflow/dags/drydock_deploy_site.py index 651c320e..428a5fd8 100644 --- a/shipyard_airflow/dags/drydock_deploy_site.py +++ b/shipyard_airflow/dags/drydock_deploy_site.py @@ -1,4 +1,4 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,7 +13,10 @@ # limitations under the License. from airflow.models import DAG -from airflow.operators import DryDockOperator +from airflow.operators import DrydockDeployNodesOperator +from airflow.operators import DrydockPrepareNodesOperator +from airflow.operators import DrydockPrepareSiteOperator +from airflow.operators import DrydockVerifySiteOperator from config_path import config_path @@ -26,34 +29,30 @@ def deploy_site_drydock(parent_dag_name, child_dag_name, args): '{}.{}'.format(parent_dag_name, child_dag_name), default_args=args) - drydock_verify_site = DryDockOperator( + drydock_verify_site = DrydockVerifySiteOperator( task_id='verify_site', shipyard_conf=config_path, - action='verify_site', main_dag_name=parent_dag_name, sub_dag_name=child_dag_name, dag=dag) - drydock_prepare_site = DryDockOperator( + drydock_prepare_site = DrydockPrepareSiteOperator( task_id='prepare_site', shipyard_conf=config_path, - action='prepare_site', main_dag_name=parent_dag_name, sub_dag_name=child_dag_name, dag=dag) - drydock_prepare_nodes = DryDockOperator( + drydock_prepare_nodes = DrydockPrepareNodesOperator( task_id='prepare_nodes', shipyard_conf=config_path, - action='prepare_nodes', main_dag_name=parent_dag_name, sub_dag_name=child_dag_name, dag=dag) - drydock_deploy_nodes = DryDockOperator( + drydock_deploy_nodes = DrydockDeployNodesOperator( task_id='deploy_nodes', shipyard_conf=config_path, - action='deploy_nodes', main_dag_name=parent_dag_name, sub_dag_name=child_dag_name, dag=dag) diff --git a/shipyard_airflow/dags/validate_site_design.py b/shipyard_airflow/dags/validate_site_design.py index 9778a1c6..23aea500 100644 --- a/shipyard_airflow/dags/validate_site_design.py +++ b/shipyard_airflow/dags/validate_site_design.py @@ -1,4 +1,4 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,7 +15,7 @@ from airflow.models import DAG from airflow.operators import ArmadaOperator from airflow.operators import DeckhandValidateSiteDesignOperator -from airflow.operators import DryDockOperator +from airflow.operators import DrydockValidateDesignOperator from config_path import config_path @@ -33,12 +33,12 @@ def validate_site_design(parent_dag_name, child_dag_name, args): shipyard_conf=config_path, main_dag_name=parent_dag_name, sub_dag_name=child_dag_name, + retries=3, dag=dag) - drydock_validate_docs = DryDockOperator( + drydock_validate_docs = DrydockValidateDesignOperator( task_id='drydock_validate_site_design', shipyard_conf=config_path, - action='validate_site_design', main_dag_name=parent_dag_name, sub_dag_name=child_dag_name, retries=3, diff --git a/shipyard_airflow/plugins/drydock_base_operator.py b/shipyard_airflow/plugins/drydock_base_operator.py new file mode 100644 index 00000000..ee65da18 --- /dev/null +++ b/shipyard_airflow/plugins/drydock_base_operator.py @@ -0,0 +1,289 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import time +from urllib.parse import urlparse + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.plugins_manager import AirflowPlugin +from airflow.utils.decorators import apply_defaults + +import drydock_provisioner.drydock_client.client as client +import drydock_provisioner.drydock_client.session as session +from drydock_provisioner import error as errors +from service_endpoint import ucp_service_endpoint +from service_token import shipyard_service_token +from xcom_puller import XcomPuller + + +class DrydockBaseOperator(BaseOperator): + + """Drydock Base Operator + + All drydock related workflow operators will use the drydock + base operator as the parent and inherit attributes and methods + from this class + + """ + + @apply_defaults + def __init__(self, + deckhand_design_ref=None, + deckhand_svc_type='deckhand', + drydock_client=None, + drydock_svc_endpoint=None, + drydock_svc_type='physicalprovisioner', + drydock_task_id=None, + main_dag_name=None, + node_filter=None, + redeploy_server=None, + shipyard_conf=None, + sub_dag_name=None, + svc_session=None, + svc_token=None, + xcom_push=True, + *args, **kwargs): + """Initialization of DrydockBaseOperator object. + + :param deckhand_design_ref: A URI reference to the design documents + :param deckhand_svc_type: Deckhand Service Type + :param drydockclient: An instance of drydock client + :param drydock_svc_endpoint: Drydock Service Endpoint + :param drydock_svc_type: Drydock Service Type + :param drydock_task_id: Drydock Task ID + :param main_dag_name: Parent Dag + :param node_filter: A filter for narrowing the scope of the task. + Valid fields are 'node_names', 'rack_names', + 'node_tags'. Note that node filter is turned + off by default, i.e. all nodes will be deployed. + :param redeploy_server: Server to be redeployed + :param shipyard_conf: Location of shipyard.conf + :param sub_dag_name: Child Dag + :param svc_session: Keystone Session + :param svc_token: Keystone Token + :param xcom_push: xcom usage + + The Drydock operator assumes that prior steps have set xcoms for + the action and the deployment configuration + + """ + + super(DrydockBaseOperator, self).__init__(*args, **kwargs) + self.deckhand_design_ref = deckhand_design_ref + self.deckhand_svc_type = deckhand_svc_type + self.drydock_client = drydock_client + self.drydock_svc_endpoint = drydock_svc_endpoint + self.drydock_svc_type = drydock_svc_type + self.drydock_task_id = drydock_task_id + self.main_dag_name = main_dag_name + self.node_filter = node_filter + self.redeploy_server = redeploy_server + self.shipyard_conf = shipyard_conf + self.svc_token = svc_token + self.sub_dag_name = sub_dag_name + self.svc_session = svc_session + self.svc_token = svc_token + self.xcom_push_flag = xcom_push + + def execute(self, context): + + # Execute drydock base function + self.drydock_base(context) + + # Exeute child function + self.do_execute() + + def drydock_base(self, context): + # Initialize Variables + drydock_url = None + dd_session = None + + # Define task_instance + task_instance = context['task_instance'] + + # Set up and retrieve values from xcom + self.xcom_puller = XcomPuller(self.main_dag_name, task_instance) + self.action_info = self.xcom_puller.get_action_info() + self.dc = self.xcom_puller.get_deployment_configuration() + + # Logs uuid of action performed by the Operator + logging.info("DryDock Operator for action %s", self.action_info['id']) + + # Retrieve information of the server that we want to redeploy if user + # executes the 'redeploy_server' dag + # Set node filter to be the server that we want to redeploy + if self.action_info['dag_id'] == 'redeploy_server': + self.redeploy_server = ( + self.action_info['parameters']['server-name']) + + if self.redeploy_server: + logging.info("Server to be redeployed is %s", + self.redeploy_server) + self.node_filter = self.redeploy_server + else: + raise AirflowException('Unable to retrieve information of ' + 'node to be redeployed!') + + # Retrieve Endpoint Information + self.drydock_svc_endpoint = ucp_service_endpoint( + self, svc_type=self.drydock_svc_type) + + # Parse DryDock Service Endpoint + drydock_url = urlparse(self.drydock_svc_endpoint) + + # Build a DrydockSession with credentials and target host + # information. + # The DrydockSession will care for TCP connection pooling + # and header management + logging.info("Build DryDock Session") + dd_session = session.DrydockSession(drydock_url.hostname, + port=drydock_url.port, + auth_gen=self._auth_gen) + + # Raise Exception if we are not able to set up the session + if dd_session: + logging.info("Successfully Set Up DryDock Session") + else: + raise AirflowException("Failed to set up Drydock Session!") + + # Use the DrydockSession to build a DrydockClient that can + # be used to make one or more API calls + logging.info("Create DryDock Client") + self.drydock_client = client.DrydockClient(dd_session) + + # Raise Exception if we are not able to build the client + if self.drydock_client: + logging.info("Successfully Set Up DryDock client") + else: + raise AirflowException("Failed to set up Drydock Client!") + + # Retrieve DeckHand Endpoint Information + deckhand_svc_endpoint = ucp_service_endpoint( + self, svc_type=self.deckhand_svc_type) + + logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint) + + # Retrieve last committed revision id + committed_revision_id = self.xcom_puller.get_design_version() + + # Form DeckHand Design Reference Path + # This URL will be used to retrieve the Site Design YAMLs + deckhand_path = "deckhand+" + deckhand_svc_endpoint + self.deckhand_design_ref = os.path.join(deckhand_path, + "revisions", + str(committed_revision_id), + "rendered-documents") + if self.deckhand_design_ref: + logging.info("Design YAMLs will be retrieved from %s", + self.deckhand_design_ref) + else: + raise AirflowException("Unable to Retrieve Design Reference!") + + @shipyard_service_token + def _auth_gen(self): + # Generator method for the Drydock Session to use to get the + # auth headers necessary + return [('X-Auth-Token', self.svc_token)] + + def create_task(self, task_action): + + # Initialize Variables + create_task_response = {} + + # Node Filter + logging.info("Nodes Filter List: %s", self.node_filter) + + try: + # Create Task + create_task_response = self.drydock_client.create_task( + design_ref=self.deckhand_design_ref, + task_action=task_action, + node_filter=self.node_filter) + + except errors.ClientError as client_error: + raise AirflowException(client_error) + + # Retrieve Task ID + self.drydock_task_id = create_task_response['task_id'] + logging.info('Drydock %s task ID is %s', + task_action, self.drydock_task_id) + + # Raise Exception if we are not able to get the task_id from + # Drydock + if self.drydock_task_id: + return self.drydock_task_id + else: + raise AirflowException("Unable to create task!") + + def query_task(self, interval, time_out): + + # Calculate number of times to execute the 'for' loop + # Convert 'time_out' and 'interval' from string into integer + # The result from the division will be a floating number which + # We will round off to nearest whole number + end_range = round(int(time_out) / int(interval)) + + logging.info('Task ID is %s', self.drydock_task_id) + + # Query task status + for i in range(0, end_range + 1): + try: + # Retrieve current task state + task_state = self.drydock_client.get_task( + task_id=self.drydock_task_id) + + task_status = task_state['status'] + task_result = task_state['result']['status'] + + logging.info("Current status of task id %s is %s", + self.drydock_task_id, task_status) + + except errors.ClientError as client_error: + raise AirflowException(client_error) + + except: + # There can be situations where there are intermittent network + # issues that prevents us from retrieving the task state. We + # will want to retry in such situations. + logging.warning("Unable to retrieve task state. Retrying...") + + # Raise Time Out Exception + if task_status == 'running' and i == end_range: + raise AirflowException("Task Execution Timed Out!") + + # Exit 'for' loop if the task is in 'complete' or 'terminated' + # state + if task_status in ['complete', 'terminated']: + logging.info('Task result is %s', task_result) + break + else: + time.sleep(int(interval)) + + # Get final task result + if task_result == 'success': + logging.info('Task id %s has been successfully completed', + self.drydock_task_id) + else: + raise AirflowException("Failed to execute/complete task!") + + +class DrydockBaseOperatorPlugin(AirflowPlugin): + + """Creates DrydockBaseOperator in Airflow.""" + + name = 'drydock_base_operator_plugin' + operators = [DrydockBaseOperator] diff --git a/shipyard_airflow/plugins/drydock_deploy_nodes.py b/shipyard_airflow/plugins/drydock_deploy_nodes.py new file mode 100644 index 00000000..63afeaa7 --- /dev/null +++ b/shipyard_airflow/plugins/drydock_deploy_nodes.py @@ -0,0 +1,69 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time + +from airflow.plugins_manager import AirflowPlugin + +from check_k8s_node_status import check_node_status +from drydock_base_operator import DrydockBaseOperator + + +class DrydockDeployNodesOperator(DrydockBaseOperator): + + """Drydock Deploy Nodes Operator + + This operator will trigger drydock to deploy the bare metal + nodes + + """ + + def do_execute(self): + + # Trigger DryDock to execute task + self.create_task('deploy_nodes') + + # Retrieve query interval and timeout + q_interval = self.dc['physical_provisioner.deploy_interval'] + task_timeout = self.dc['physical_provisioner.deploy_timeout'] + + # Query Task + self.query_task(q_interval, task_timeout) + + # It takes time for the cluster join process to be triggered across + # all the nodes in the cluster. Hence there is a need to back off + # and wait before checking the state of the cluster join process. + join_wait = self.dc['physical_provisioner.join_wait'] + + logging.info("All nodes deployed in MAAS") + logging.info("Wait for %d seconds before checking node state...", + join_wait) + + time.sleep(join_wait) + + # Check that cluster join process is completed before declaring + # deploy_node as 'completed'. + node_st_timeout = self.dc['kubernetes.node_status_timeout'] + node_st_interval = self.dc['kubernetes.node_status_interval'] + + check_node_status(node_st_timeout, node_st_interval) + + +class DrydockDeployNodesOperatorPlugin(AirflowPlugin): + + """Creates DrydockDeployNodesOperator in Airflow.""" + + name = 'drydock_deploy_nodes_operator' + operators = [DrydockDeployNodesOperator] diff --git a/shipyard_airflow/plugins/drydock_destroy_nodes.py b/shipyard_airflow/plugins/drydock_destroy_nodes.py new file mode 100644 index 00000000..ef5918a1 --- /dev/null +++ b/shipyard_airflow/plugins/drydock_destroy_nodes.py @@ -0,0 +1,52 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time + +from airflow.plugins_manager import AirflowPlugin + +from drydock_base_operator import DrydockBaseOperator + + +class DrydockDestroyNodeOperator(DrydockBaseOperator): + + """Drydock Destroy Node Operator + + This operator will trigger drydock to destroy a bare metal + node + + """ + + def do_execute(self): + + # Retrieve query interval and timeout + q_interval = self.dc['physical_provisioner.destroy_interval'] + task_timeout = self.dc['physical_provisioner.destroy_timeout'] + + # NOTE: This is a PlaceHolder function. The 'destroy_node' + # functionalities in DryDock is being worked on and is not + # ready at the moment. + logging.info("Destroying node %s from cluster...", + self.redeploy_server) + time.sleep(15) + logging.info("Successfully deleted node %s", self.redeploy_server) + + +class DrydockDestroyNodeOperatorPlugin(AirflowPlugin): + + """Creates DrydockDestroyNodeOperator in Airflow.""" + + name = 'drydock_destroy_node_operator' + operators = [DrydockDestroyNodeOperator] diff --git a/shipyard_airflow/plugins/drydock_operators.py b/shipyard_airflow/plugins/drydock_operators.py deleted file mode 100644 index c87c531b..00000000 --- a/shipyard_airflow/plugins/drydock_operators.py +++ /dev/null @@ -1,436 +0,0 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json -import logging -import os -import requests -import time -from urllib.parse import urlparse - -from airflow.exceptions import AirflowException -from airflow.models import BaseOperator -from airflow.plugins_manager import AirflowPlugin -from airflow.utils.decorators import apply_defaults - -import drydock_provisioner.drydock_client.client as client -import drydock_provisioner.drydock_client.session as session -from check_k8s_node_status import check_node_status -from drydock_provisioner import error as errors -from service_endpoint import ucp_service_endpoint -from service_token import shipyard_service_token -from xcom_puller import XcomPuller - - -class DryDockOperator(BaseOperator): - """DryDock Client""" - @apply_defaults - def __init__(self, - action=None, - design_ref=None, - main_dag_name=None, - node_filter=None, - shipyard_conf=None, - svc_token=None, - sub_dag_name=None, - xcom_push=True, - *args, **kwargs): - """ - :param action: Task to perform - :param design_ref: A URI reference to the design documents - :param main_dag_name: Parent Dag - :param node_filter: A filter for narrowing the scope of the task. Valid - fields are 'node_names', 'rack_names', 'node_tags' - :param shipyard_conf: Location of shipyard.conf - :param sub_dag_name: Child Dag - - The Drydock operator assumes that prior steps have set xcoms for - the action and the deployment configuration - """ - - super(DryDockOperator, self).__init__(*args, **kwargs) - self.action = action - self.design_ref = design_ref - self.main_dag_name = main_dag_name - self.node_filter = node_filter - self.shipyard_conf = shipyard_conf - self.svc_token = svc_token - self.sub_dag_name = sub_dag_name - self.xcom_push_flag = xcom_push - - def execute(self, context): - # Initialize Variable - redeploy_server = None - - # Placeholder definition - # TODO: Need to decide how to pass the required value from Shipyard to - # the 'node_filter' variable. No filter will be used for now. - self.node_filter = None - - # Define task_instance - task_instance = context['task_instance'] - - # Set up and retrieve values from xcom - self.xcom_puller = XcomPuller(self.main_dag_name, task_instance) - self.action_info = self.xcom_puller.get_action_info() - self.dc = self.xcom_puller.get_deployment_configuration() - - # Logs uuid of action performed by the Operator - logging.info("DryDock Operator for action %s", self.action_info['id']) - - # Retrieve information of the server that we want to redeploy if user - # executes the 'redeploy_server' dag - # Set node filter to be the server that we want to redeploy - if self.action_info['dag_id'] == 'redeploy_server': - redeploy_server = self.action_info['parameters'].get('server-name') - - if redeploy_server: - logging.info("Server to be redeployed is %s", redeploy_server) - self.node_filter = redeploy_server - else: - raise AirflowException('Unable to retrieve information of ' - 'node to be redeployed!') - - # Retrieve Deckhand Design Reference - self.design_ref = self.get_deckhand_design_ref(context) - - if self.design_ref: - logging.info("Drydock YAMLs will be retrieved from %s", - self.design_ref) - else: - raise AirflowException("Unable to Retrieve Design Reference!") - - # Drydock Validate Site Design - if self.action == 'validate_site_design': - # Initialize variable - site_design_validity = 'invalid' - - # Retrieve Endpoint Information - svc_type = 'physicalprovisioner' - drydock_svc_endpoint = ucp_service_endpoint(self, - svc_type=svc_type) - - site_design_validity = self.drydock_validate_design( - drydock_svc_endpoint) - - return site_design_validity - - # DrydockClient - # Retrieve Endpoint Information - svc_type = 'physicalprovisioner' - drydock_svc_endpoint = ucp_service_endpoint(self, - svc_type=svc_type) - logging.info("DryDock endpoint is %s", drydock_svc_endpoint) - - # Set up DryDock Client - drydock_client = self.drydock_session_client(drydock_svc_endpoint) - - # Create Task for verify_site - if self.action == 'verify_site': - q_interval = self.dc['physical_provisioner.verify_interval'] - task_timeout = self.dc['physical_provisioner.verify_timeout'] - self.drydock_action(drydock_client, context, self.action, - q_interval, task_timeout) - - # Create Task for prepare_site - elif self.action == 'prepare_site': - q_interval = self.dc['physical_provisioner.prepare_site_interval'] - task_timeout = self.dc['physical_provisioner.prepare_site_timeout'] - self.drydock_action(drydock_client, context, self.action, - q_interval, task_timeout) - - # Create Task for prepare_node - elif self.action == 'prepare_nodes': - q_interval = self.dc['physical_provisioner.prepare_node_interval'] - task_timeout = self.dc['physical_provisioner.prepare_node_timeout'] - self.drydock_action(drydock_client, context, self.action, - q_interval, task_timeout) - - # Create Task for deploy_node - elif self.action == 'deploy_nodes': - q_interval = self.dc['physical_provisioner.deploy_interval'] - task_timeout = self.dc['physical_provisioner.deploy_timeout'] - self.drydock_action(drydock_client, context, self.action, - q_interval, task_timeout) - - # Wait for 120 seconds (default value) before checking the cluster - # join process as it takes time for process to be triggered across - # all nodes - join_wait = self.dc['physical_provisioner.join_wait'] - logging.info("All nodes deployed in MAAS") - logging.info("Wait for %d seconds before checking node state...", - join_wait) - time.sleep(join_wait) - # Check that cluster join process is completed before declaring - # deploy_node as 'completed'. - node_st_timeout = self.dc['kubernetes.node_status_timeout'] - node_st_interval = self.dc['kubernetes.node_status_interval'] - check_node_status(node_st_timeout, node_st_interval) - - # Create Task for destroy_node - # NOTE: This is a PlaceHolder function. The 'destroy_node' - # functionalities in DryDock is being worked on and is not - # ready at the moment. - elif self.action == 'destroy_node': - # see deployment_configuration_operator.py for defaults - q_interval = self.dc['physical_provisioner.destroy_interval'] - task_timeout = self.dc['physical_provisioner.destroy_timeout'] - - logging.info("Destroying node %s from cluster...", redeploy_server) - time.sleep(15) - logging.info("Successfully deleted node %s", redeploy_server) - - # TODO: Uncomment when the function to destroy/delete node is - # ready for consumption in Drydock - # self.drydock_action(drydock_client, context, self.action, - # q_interval, task_timeout) - - # Do not perform any action - else: - logging.info('No Action to Perform') - - @shipyard_service_token - def _auth_gen(self): - # Generator method for the Drydock Session to use to get the - # auth headers necessary - return [('X-Auth-Token', self.svc_token)] - - def drydock_session_client(self, drydock_svc_endpoint): - # Initialize Variables - drydock_url = None - dd_session = None - dd_client = None - - # Parse DryDock Service Endpoint - drydock_url = urlparse(drydock_svc_endpoint) - - # Build a DrydockSession with credentials and target host - # information. - logging.info("Build DryDock Session") - dd_session = session.DrydockSession(drydock_url.hostname, - port=drydock_url.port, - auth_gen=self._auth_gen) - - # Raise Exception if we are not able to get a drydock session - if dd_session: - logging.info("Successfully Set Up DryDock Session") - else: - raise AirflowException("Failed to set up Drydock Session!") - - # Use session to build a DrydockClient to make one or more API calls - # The DrydockSession will care for TCP connection pooling - # and header management - logging.info("Create DryDock Client") - dd_client = client.DrydockClient(dd_session) - - # Raise Exception if we are not able to build drydock client - if dd_client: - logging.info("Successfully Set Up DryDock client") - else: - raise AirflowException("Unable to set up Drydock Client!") - - # Drydock client for XCOM Usage - return dd_client - - def drydock_action(self, drydock_client, context, action, interval, - time_out): - - # Trigger DryDock to execute task and retrieve task ID - task_id = self.drydock_perform_task(drydock_client, context, - action, self.node_filter) - - logging.info('Task ID is %s', task_id) - - # Query Task - self.drydock_query_task(drydock_client, context, interval, - time_out, task_id) - - def drydock_perform_task(self, drydock_client, context, - perform_task, nodes_filter): - - # Initialize Variables - create_task_response = {} - task_id = None - - # Node Filter - logging.info("Nodes Filter List: %s", nodes_filter) - - # Create Task - create_task_response = drydock_client.create_task( - design_ref=self.design_ref, - task_action=perform_task, - node_filter=nodes_filter) - - # Retrieve Task ID - task_id = create_task_response.get('task_id') - logging.info('Drydock %s task ID is %s', perform_task, task_id) - - # Raise Exception if we are not able to get the task_id from - # drydock - if task_id: - return task_id - else: - raise AirflowException("Unable to create task!") - - def drydock_query_task(self, drydock_client, context, interval, - time_out, task_id): - - # Initialize Variables - keystone_token_expired = False - new_dd_client = None - dd_client = drydock_client - - # Calculate number of times to execute the 'for' loop - # Convert 'time_out' and 'interval' from string into integer - # The result from the division will be a floating number which - # We will round off to nearest whole number - end_range = round(int(time_out) / int(interval)) - - # Query task status - for i in range(0, end_range + 1): - - if keystone_token_expired: - logging.info("Established new drydock session") - dd_client = new_dd_client - - try: - # Retrieve current task state - task_state = dd_client.get_task(task_id=task_id) - task_status = task_state.get('status') - task_result = task_state.get('result')['status'] - - logging.info("Current status of task id %s is %s", - task_id, task_status) - - keystone_token_expired = False - - except errors.ClientUnauthorizedError as unauthorized_error: - - # TODO: This is a temporary workaround. Drydock will be - # updated with the appropriate fix in the drydock api - # client by having the session detect a 401/403 response - # and refresh the token appropriately. - # Logs drydock client unauthorized error - keystone_token_expired = True - logging.error(unauthorized_error) - - # Set up new drydock client with new keystone token - logging.info("Setting up new drydock session...") - - drydock_svc_endpoint = ucp_service_endpoint( - self, svc_type='physicalprovisioner') - - new_dd_client = self.drydock_session_client( - drydock_svc_endpoint) - - except errors.ClientForbiddenError as forbidden_error: - raise AirflowException(forbidden_error) - - except errors.ClientError as client_error: - raise AirflowException(client_error) - - except: - # There can be instances where there are intermittent network - # issues that prevents us from retrieving the task state. We - # will want to retry in such situations. - logging.info("Unable to retrieve task state. Retrying...") - - # Raise Time Out Exception - if task_status == 'running' and i == end_range: - raise AirflowException("Task Execution Timed Out!") - - # Exit 'for' loop if the task is in 'complete' or 'terminated' - # state - if task_status in ['complete', 'terminated']: - logging.info('Task result is %s', task_result) - break - else: - time.sleep(int(interval)) - - # Get final task result - if task_result == 'success': - logging.info('Task id %s has been successfully completed', - self.task_id) - else: - raise AirflowException("Failed to execute/complete task!") - - def get_deckhand_design_ref(self, context): - - # Retrieve DeckHand Endpoint Information - svc_type = 'deckhand' - deckhand_svc_endpoint = ucp_service_endpoint(self, - svc_type=svc_type) - logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint) - - committed_revision_id = self.xcom_puller.get_design_version() - - # Form DeckHand Design Reference Path that we will use to retrieve - # the DryDock YAMLs - deckhand_path = "deckhand+" + deckhand_svc_endpoint - deckhand_design_ref = os.path.join(deckhand_path, - "revisions", - str(committed_revision_id), - "rendered-documents") - - return deckhand_design_ref - - @shipyard_service_token - def drydock_validate_design(self, drydock_svc_endpoint): - - # Form Validation Endpoint - validation_endpoint = os.path.join(drydock_svc_endpoint, - 'validatedesign') - - logging.info("Validation Endpoint is %s", validation_endpoint) - - # Define Headers and Payload - headers = { - 'Content-Type': 'application/json', - 'X-Auth-Token': self.svc_token - } - - payload = { - 'rel': "design", - 'href': self.design_ref, - 'type': "application/x-yaml" - } - - # Requests DryDock to validate site design - logging.info("Waiting for DryDock to validate site design...") - - try: - design_validate_response = requests.post(validation_endpoint, - headers=headers, - data=json.dumps(payload)) - except requests.exceptions.RequestException as e: - raise AirflowException(e) - - # Convert response to string - validate_site_design = design_validate_response.text - - # Print response - logging.info("Retrieving DryDock validate site design response...") - logging.info(json.loads(validate_site_design)) - - # Check if site design is valid - if json.loads(validate_site_design).get('status') == 'Success': - logging.info("DryDock Site Design has been successfully validated") - return 'valid' - else: - raise AirflowException("DryDock Site Design Validation Failed!") - - -class DryDockClientPlugin(AirflowPlugin): - name = "drydock_client_plugin" - operators = [DryDockOperator] diff --git a/shipyard_airflow/plugins/drydock_prepare_nodes.py b/shipyard_airflow/plugins/drydock_prepare_nodes.py new file mode 100644 index 00000000..2aa1a690 --- /dev/null +++ b/shipyard_airflow/plugins/drydock_prepare_nodes.py @@ -0,0 +1,47 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow.plugins_manager import AirflowPlugin + +from drydock_base_operator import DrydockBaseOperator + + +class DrydockPrepareNodesOperator(DrydockBaseOperator): + + """Drydock Prepare Nodes Operator + + This operator will trigger drydock to prepare nodes for + site deployment + + """ + + def do_execute(self): + + # Trigger DryDock to execute task + self.create_task('prepare_site') + + # Retrieve query interval and timeout + q_interval = self.dc['physical_provisioner.prepare_node_interval'] + task_timeout = self.dc['physical_provisioner.prepare_node_timeout'] + + # Query Task + self.query_task(q_interval, task_timeout) + + +class DrydockPrepareNodesOperatorPlugin(AirflowPlugin): + + """Creates DrydockPrepareNodesOperator in Airflow.""" + + name = 'drydock_prepare_nodes_operator' + operators = [DrydockPrepareNodesOperator] diff --git a/shipyard_airflow/plugins/drydock_prepare_site.py b/shipyard_airflow/plugins/drydock_prepare_site.py new file mode 100644 index 00000000..a9edaa77 --- /dev/null +++ b/shipyard_airflow/plugins/drydock_prepare_site.py @@ -0,0 +1,47 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow.plugins_manager import AirflowPlugin + +from drydock_base_operator import DrydockBaseOperator + + +class DrydockPrepareSiteOperator(DrydockBaseOperator): + + """Drydock Prepare Site Operator + + This operator will trigger drydock to prepare site for + site deployment + + """ + + def do_execute(self): + + # Trigger DryDock to execute task + self.create_task('prepare_site') + + # Retrieve query interval and timeout + q_interval = self.dc['physical_provisioner.prepare_site_interval'] + task_timeout = self.dc['physical_provisioner.prepare_site_timeout'] + + # Query Task + self.query_task(q_interval, task_timeout) + + +class DrydockPrepareSiteOperatorPlugin(AirflowPlugin): + + """Creates DrydockPrepareSiteOperator in Airflow.""" + + name = 'drydock_prepare_site_operator' + operators = [DrydockPrepareSiteOperator] diff --git a/shipyard_airflow/plugins/drydock_validate_design.py b/shipyard_airflow/plugins/drydock_validate_design.py new file mode 100644 index 00000000..1b769f8b --- /dev/null +++ b/shipyard_airflow/plugins/drydock_validate_design.py @@ -0,0 +1,85 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import os +import requests + +from airflow.plugins_manager import AirflowPlugin +from airflow.exceptions import AirflowException + +from drydock_base_operator import DrydockBaseOperator + + +class DrydockValidateDesignOperator(DrydockBaseOperator): + + """Drydock Validate Design Operator + + This operator will trigger drydock to validate the + site design + + """ + + def do_execute(self): + + # Form Validation Endpoint + validation_endpoint = os.path.join(self.drydock_svc_endpoint, + 'validatedesign') + + logging.info("Validation Endpoint is %s", validation_endpoint) + + # Define Headers and Payload + headers = { + 'Content-Type': 'application/json', + 'X-Auth-Token': self.svc_token + } + + payload = { + 'rel': "design", + 'href': self.deckhand_design_ref, + 'type': "application/x-yaml" + } + + # Requests DryDock to validate site design + logging.info("Waiting for DryDock to validate site design...") + + try: + design_validate_response = requests.post(validation_endpoint, + headers=headers, + data=json.dumps(payload)) + + except requests.exceptions.RequestException as e: + raise AirflowException(e) + + # Convert response to string + validate_site_design = design_validate_response.text + + # Print response + logging.info("Retrieving DryDock validate site design response...") + logging.info(json.loads(validate_site_design)) + + # Check if site design is valid + if json.loads(validate_site_design).get('status') == 'Success': + logging.info("DryDock Site Design has been successfully validated") + else: + raise AirflowException("DryDock Site Design Validation Failed!") + + +class DrydockValidateDesignOperatorPlugin(AirflowPlugin): + + """Creates DrydockValidateDesignOperator in Airflow.""" + + name = 'drydock_validate_design_operator' + operators = [DrydockValidateDesignOperator] diff --git a/shipyard_airflow/plugins/drydock_verify_site.py b/shipyard_airflow/plugins/drydock_verify_site.py new file mode 100644 index 00000000..3954f72d --- /dev/null +++ b/shipyard_airflow/plugins/drydock_verify_site.py @@ -0,0 +1,46 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow.plugins_manager import AirflowPlugin + +from drydock_base_operator import DrydockBaseOperator + + +class DrydockVerifySiteOperator(DrydockBaseOperator): + + """Drydock Verify Site Operator + + This operator will trigger drydock to verify site + + """ + + def do_execute(self): + + # Trigger DryDock to execute task + self.create_task('verify_site') + + # Retrieve query interval and timeout + q_interval = self.dc['physical_provisioner.verify_interval'] + task_timeout = self.dc['physical_provisioner.verify_timeout'] + + # Query Task + self.query_task(q_interval, task_timeout) + + +class DrydockVerifySiteOperatorPlugin(AirflowPlugin): + + """Creates DrydockVerifySiteOperator in Airflow.""" + + name = 'drydock_verify_site_operator' + operators = [DrydockVerifySiteOperator]