Refactor Drydock Operator

We will move away from the usage of if/else block and will instead
make use of inheritance (drydock base operator) and task specific
operators in our dags to execute the workflow

Change-Id: Ifea530ad8a8a2a591b511be4f037d7b4b9dd6c6f
This commit is contained in:
Anthony Lin 2018-03-16 11:06:44 +00:00
parent 7219519135
commit f7d02238c3
11 changed files with 650 additions and 453 deletions

View File

@ -13,7 +13,7 @@
# limitations under the License. # limitations under the License.
from airflow.models import DAG from airflow.models import DAG
from airflow.operators import DryDockOperator from airflow.operators import DrydockDestroyNodeOperator
from airflow.operators import PromenadeCheckEtcdOperator from airflow.operators import PromenadeCheckEtcdOperator
from airflow.operators import PromenadeClearLabelsOperator from airflow.operators import PromenadeClearLabelsOperator
from airflow.operators import PromenadeDecommissionNodeOperator from airflow.operators import PromenadeDecommissionNodeOperator
@ -67,10 +67,9 @@ def destroy_server(parent_dag_name, child_dag_name, args):
dag=dag) dag=dag)
# Power down and destroy node using DryDock # Power down and destroy node using DryDock
drydock_destroy_node = DryDockOperator( drydock_destroy_node = DrydockDestroyNodeOperator(
task_id='destroy_node', task_id='destroy_node',
shipyard_conf=config_path, shipyard_conf=config_path,
action='destroy_node',
main_dag_name=parent_dag_name, main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name, sub_dag_name=child_dag_name,
dag=dag) dag=dag)

View File

@ -1,4 +1,4 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved. # Copyright 2018 AT&T Intellectual Property. All other rights reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -13,7 +13,10 @@
# limitations under the License. # limitations under the License.
from airflow.models import DAG from airflow.models import DAG
from airflow.operators import DryDockOperator from airflow.operators import DrydockDeployNodesOperator
from airflow.operators import DrydockPrepareNodesOperator
from airflow.operators import DrydockPrepareSiteOperator
from airflow.operators import DrydockVerifySiteOperator
from config_path import config_path from config_path import config_path
@ -26,34 +29,30 @@ def deploy_site_drydock(parent_dag_name, child_dag_name, args):
'{}.{}'.format(parent_dag_name, child_dag_name), '{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args) default_args=args)
drydock_verify_site = DryDockOperator( drydock_verify_site = DrydockVerifySiteOperator(
task_id='verify_site', task_id='verify_site',
shipyard_conf=config_path, shipyard_conf=config_path,
action='verify_site',
main_dag_name=parent_dag_name, main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name, sub_dag_name=child_dag_name,
dag=dag) dag=dag)
drydock_prepare_site = DryDockOperator( drydock_prepare_site = DrydockPrepareSiteOperator(
task_id='prepare_site', task_id='prepare_site',
shipyard_conf=config_path, shipyard_conf=config_path,
action='prepare_site',
main_dag_name=parent_dag_name, main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name, sub_dag_name=child_dag_name,
dag=dag) dag=dag)
drydock_prepare_nodes = DryDockOperator( drydock_prepare_nodes = DrydockPrepareNodesOperator(
task_id='prepare_nodes', task_id='prepare_nodes',
shipyard_conf=config_path, shipyard_conf=config_path,
action='prepare_nodes',
main_dag_name=parent_dag_name, main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name, sub_dag_name=child_dag_name,
dag=dag) dag=dag)
drydock_deploy_nodes = DryDockOperator( drydock_deploy_nodes = DrydockDeployNodesOperator(
task_id='deploy_nodes', task_id='deploy_nodes',
shipyard_conf=config_path, shipyard_conf=config_path,
action='deploy_nodes',
main_dag_name=parent_dag_name, main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name, sub_dag_name=child_dag_name,
dag=dag) dag=dag)

View File

@ -1,4 +1,4 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved. # Copyright 2018 AT&T Intellectual Property. All other rights reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -15,7 +15,7 @@
from airflow.models import DAG from airflow.models import DAG
from airflow.operators import ArmadaOperator from airflow.operators import ArmadaOperator
from airflow.operators import DeckhandValidateSiteDesignOperator from airflow.operators import DeckhandValidateSiteDesignOperator
from airflow.operators import DryDockOperator from airflow.operators import DrydockValidateDesignOperator
from config_path import config_path from config_path import config_path
@ -33,12 +33,12 @@ def validate_site_design(parent_dag_name, child_dag_name, args):
shipyard_conf=config_path, shipyard_conf=config_path,
main_dag_name=parent_dag_name, main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name, sub_dag_name=child_dag_name,
retries=3,
dag=dag) dag=dag)
drydock_validate_docs = DryDockOperator( drydock_validate_docs = DrydockValidateDesignOperator(
task_id='drydock_validate_site_design', task_id='drydock_validate_site_design',
shipyard_conf=config_path, shipyard_conf=config_path,
action='validate_site_design',
main_dag_name=parent_dag_name, main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name, sub_dag_name=child_dag_name,
retries=3, retries=3,

View File

@ -0,0 +1,289 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import time
from urllib.parse import urlparse
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
import drydock_provisioner.drydock_client.client as client
import drydock_provisioner.drydock_client.session as session
from drydock_provisioner import error as errors
from service_endpoint import ucp_service_endpoint
from service_token import shipyard_service_token
from xcom_puller import XcomPuller
class DrydockBaseOperator(BaseOperator):
"""Drydock Base Operator
All drydock related workflow operators will use the drydock
base operator as the parent and inherit attributes and methods
from this class
"""
@apply_defaults
def __init__(self,
deckhand_design_ref=None,
deckhand_svc_type='deckhand',
drydock_client=None,
drydock_svc_endpoint=None,
drydock_svc_type='physicalprovisioner',
drydock_task_id=None,
main_dag_name=None,
node_filter=None,
redeploy_server=None,
shipyard_conf=None,
sub_dag_name=None,
svc_session=None,
svc_token=None,
xcom_push=True,
*args, **kwargs):
"""Initialization of DrydockBaseOperator object.
:param deckhand_design_ref: A URI reference to the design documents
:param deckhand_svc_type: Deckhand Service Type
:param drydockclient: An instance of drydock client
:param drydock_svc_endpoint: Drydock Service Endpoint
:param drydock_svc_type: Drydock Service Type
:param drydock_task_id: Drydock Task ID
:param main_dag_name: Parent Dag
:param node_filter: A filter for narrowing the scope of the task.
Valid fields are 'node_names', 'rack_names',
'node_tags'. Note that node filter is turned
off by default, i.e. all nodes will be deployed.
:param redeploy_server: Server to be redeployed
:param shipyard_conf: Location of shipyard.conf
:param sub_dag_name: Child Dag
:param svc_session: Keystone Session
:param svc_token: Keystone Token
:param xcom_push: xcom usage
The Drydock operator assumes that prior steps have set xcoms for
the action and the deployment configuration
"""
super(DrydockBaseOperator, self).__init__(*args, **kwargs)
self.deckhand_design_ref = deckhand_design_ref
self.deckhand_svc_type = deckhand_svc_type
self.drydock_client = drydock_client
self.drydock_svc_endpoint = drydock_svc_endpoint
self.drydock_svc_type = drydock_svc_type
self.drydock_task_id = drydock_task_id
self.main_dag_name = main_dag_name
self.node_filter = node_filter
self.redeploy_server = redeploy_server
self.shipyard_conf = shipyard_conf
self.svc_token = svc_token
self.sub_dag_name = sub_dag_name
self.svc_session = svc_session
self.svc_token = svc_token
self.xcom_push_flag = xcom_push
def execute(self, context):
# Execute drydock base function
self.drydock_base(context)
# Exeute child function
self.do_execute()
def drydock_base(self, context):
# Initialize Variables
drydock_url = None
dd_session = None
# Define task_instance
task_instance = context['task_instance']
# Set up and retrieve values from xcom
self.xcom_puller = XcomPuller(self.main_dag_name, task_instance)
self.action_info = self.xcom_puller.get_action_info()
self.dc = self.xcom_puller.get_deployment_configuration()
# Logs uuid of action performed by the Operator
logging.info("DryDock Operator for action %s", self.action_info['id'])
# Retrieve information of the server that we want to redeploy if user
# executes the 'redeploy_server' dag
# Set node filter to be the server that we want to redeploy
if self.action_info['dag_id'] == 'redeploy_server':
self.redeploy_server = (
self.action_info['parameters']['server-name'])
if self.redeploy_server:
logging.info("Server to be redeployed is %s",
self.redeploy_server)
self.node_filter = self.redeploy_server
else:
raise AirflowException('Unable to retrieve information of '
'node to be redeployed!')
# Retrieve Endpoint Information
self.drydock_svc_endpoint = ucp_service_endpoint(
self, svc_type=self.drydock_svc_type)
# Parse DryDock Service Endpoint
drydock_url = urlparse(self.drydock_svc_endpoint)
# Build a DrydockSession with credentials and target host
# information.
# The DrydockSession will care for TCP connection pooling
# and header management
logging.info("Build DryDock Session")
dd_session = session.DrydockSession(drydock_url.hostname,
port=drydock_url.port,
auth_gen=self._auth_gen)
# Raise Exception if we are not able to set up the session
if dd_session:
logging.info("Successfully Set Up DryDock Session")
else:
raise AirflowException("Failed to set up Drydock Session!")
# Use the DrydockSession to build a DrydockClient that can
# be used to make one or more API calls
logging.info("Create DryDock Client")
self.drydock_client = client.DrydockClient(dd_session)
# Raise Exception if we are not able to build the client
if self.drydock_client:
logging.info("Successfully Set Up DryDock client")
else:
raise AirflowException("Failed to set up Drydock Client!")
# Retrieve DeckHand Endpoint Information
deckhand_svc_endpoint = ucp_service_endpoint(
self, svc_type=self.deckhand_svc_type)
logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint)
# Retrieve last committed revision id
committed_revision_id = self.xcom_puller.get_design_version()
# Form DeckHand Design Reference Path
# This URL will be used to retrieve the Site Design YAMLs
deckhand_path = "deckhand+" + deckhand_svc_endpoint
self.deckhand_design_ref = os.path.join(deckhand_path,
"revisions",
str(committed_revision_id),
"rendered-documents")
if self.deckhand_design_ref:
logging.info("Design YAMLs will be retrieved from %s",
self.deckhand_design_ref)
else:
raise AirflowException("Unable to Retrieve Design Reference!")
@shipyard_service_token
def _auth_gen(self):
# Generator method for the Drydock Session to use to get the
# auth headers necessary
return [('X-Auth-Token', self.svc_token)]
def create_task(self, task_action):
# Initialize Variables
create_task_response = {}
# Node Filter
logging.info("Nodes Filter List: %s", self.node_filter)
try:
# Create Task
create_task_response = self.drydock_client.create_task(
design_ref=self.deckhand_design_ref,
task_action=task_action,
node_filter=self.node_filter)
except errors.ClientError as client_error:
raise AirflowException(client_error)
# Retrieve Task ID
self.drydock_task_id = create_task_response['task_id']
logging.info('Drydock %s task ID is %s',
task_action, self.drydock_task_id)
# Raise Exception if we are not able to get the task_id from
# Drydock
if self.drydock_task_id:
return self.drydock_task_id
else:
raise AirflowException("Unable to create task!")
def query_task(self, interval, time_out):
# Calculate number of times to execute the 'for' loop
# Convert 'time_out' and 'interval' from string into integer
# The result from the division will be a floating number which
# We will round off to nearest whole number
end_range = round(int(time_out) / int(interval))
logging.info('Task ID is %s', self.drydock_task_id)
# Query task status
for i in range(0, end_range + 1):
try:
# Retrieve current task state
task_state = self.drydock_client.get_task(
task_id=self.drydock_task_id)
task_status = task_state['status']
task_result = task_state['result']['status']
logging.info("Current status of task id %s is %s",
self.drydock_task_id, task_status)
except errors.ClientError as client_error:
raise AirflowException(client_error)
except:
# There can be situations where there are intermittent network
# issues that prevents us from retrieving the task state. We
# will want to retry in such situations.
logging.warning("Unable to retrieve task state. Retrying...")
# Raise Time Out Exception
if task_status == 'running' and i == end_range:
raise AirflowException("Task Execution Timed Out!")
# Exit 'for' loop if the task is in 'complete' or 'terminated'
# state
if task_status in ['complete', 'terminated']:
logging.info('Task result is %s', task_result)
break
else:
time.sleep(int(interval))
# Get final task result
if task_result == 'success':
logging.info('Task id %s has been successfully completed',
self.drydock_task_id)
else:
raise AirflowException("Failed to execute/complete task!")
class DrydockBaseOperatorPlugin(AirflowPlugin):
"""Creates DrydockBaseOperator in Airflow."""
name = 'drydock_base_operator_plugin'
operators = [DrydockBaseOperator]

View File

@ -0,0 +1,69 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from airflow.plugins_manager import AirflowPlugin
from check_k8s_node_status import check_node_status
from drydock_base_operator import DrydockBaseOperator
class DrydockDeployNodesOperator(DrydockBaseOperator):
"""Drydock Deploy Nodes Operator
This operator will trigger drydock to deploy the bare metal
nodes
"""
def do_execute(self):
# Trigger DryDock to execute task
self.create_task('deploy_nodes')
# Retrieve query interval and timeout
q_interval = self.dc['physical_provisioner.deploy_interval']
task_timeout = self.dc['physical_provisioner.deploy_timeout']
# Query Task
self.query_task(q_interval, task_timeout)
# It takes time for the cluster join process to be triggered across
# all the nodes in the cluster. Hence there is a need to back off
# and wait before checking the state of the cluster join process.
join_wait = self.dc['physical_provisioner.join_wait']
logging.info("All nodes deployed in MAAS")
logging.info("Wait for %d seconds before checking node state...",
join_wait)
time.sleep(join_wait)
# Check that cluster join process is completed before declaring
# deploy_node as 'completed'.
node_st_timeout = self.dc['kubernetes.node_status_timeout']
node_st_interval = self.dc['kubernetes.node_status_interval']
check_node_status(node_st_timeout, node_st_interval)
class DrydockDeployNodesOperatorPlugin(AirflowPlugin):
"""Creates DrydockDeployNodesOperator in Airflow."""
name = 'drydock_deploy_nodes_operator'
operators = [DrydockDeployNodesOperator]

View File

@ -0,0 +1,52 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from airflow.plugins_manager import AirflowPlugin
from drydock_base_operator import DrydockBaseOperator
class DrydockDestroyNodeOperator(DrydockBaseOperator):
"""Drydock Destroy Node Operator
This operator will trigger drydock to destroy a bare metal
node
"""
def do_execute(self):
# Retrieve query interval and timeout
q_interval = self.dc['physical_provisioner.destroy_interval']
task_timeout = self.dc['physical_provisioner.destroy_timeout']
# NOTE: This is a PlaceHolder function. The 'destroy_node'
# functionalities in DryDock is being worked on and is not
# ready at the moment.
logging.info("Destroying node %s from cluster...",
self.redeploy_server)
time.sleep(15)
logging.info("Successfully deleted node %s", self.redeploy_server)
class DrydockDestroyNodeOperatorPlugin(AirflowPlugin):
"""Creates DrydockDestroyNodeOperator in Airflow."""
name = 'drydock_destroy_node_operator'
operators = [DrydockDestroyNodeOperator]

View File

@ -1,436 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import os
import requests
import time
from urllib.parse import urlparse
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
import drydock_provisioner.drydock_client.client as client
import drydock_provisioner.drydock_client.session as session
from check_k8s_node_status import check_node_status
from drydock_provisioner import error as errors
from service_endpoint import ucp_service_endpoint
from service_token import shipyard_service_token
from xcom_puller import XcomPuller
class DryDockOperator(BaseOperator):
"""DryDock Client"""
@apply_defaults
def __init__(self,
action=None,
design_ref=None,
main_dag_name=None,
node_filter=None,
shipyard_conf=None,
svc_token=None,
sub_dag_name=None,
xcom_push=True,
*args, **kwargs):
"""
:param action: Task to perform
:param design_ref: A URI reference to the design documents
:param main_dag_name: Parent Dag
:param node_filter: A filter for narrowing the scope of the task. Valid
fields are 'node_names', 'rack_names', 'node_tags'
:param shipyard_conf: Location of shipyard.conf
:param sub_dag_name: Child Dag
The Drydock operator assumes that prior steps have set xcoms for
the action and the deployment configuration
"""
super(DryDockOperator, self).__init__(*args, **kwargs)
self.action = action
self.design_ref = design_ref
self.main_dag_name = main_dag_name
self.node_filter = node_filter
self.shipyard_conf = shipyard_conf
self.svc_token = svc_token
self.sub_dag_name = sub_dag_name
self.xcom_push_flag = xcom_push
def execute(self, context):
# Initialize Variable
redeploy_server = None
# Placeholder definition
# TODO: Need to decide how to pass the required value from Shipyard to
# the 'node_filter' variable. No filter will be used for now.
self.node_filter = None
# Define task_instance
task_instance = context['task_instance']
# Set up and retrieve values from xcom
self.xcom_puller = XcomPuller(self.main_dag_name, task_instance)
self.action_info = self.xcom_puller.get_action_info()
self.dc = self.xcom_puller.get_deployment_configuration()
# Logs uuid of action performed by the Operator
logging.info("DryDock Operator for action %s", self.action_info['id'])
# Retrieve information of the server that we want to redeploy if user
# executes the 'redeploy_server' dag
# Set node filter to be the server that we want to redeploy
if self.action_info['dag_id'] == 'redeploy_server':
redeploy_server = self.action_info['parameters'].get('server-name')
if redeploy_server:
logging.info("Server to be redeployed is %s", redeploy_server)
self.node_filter = redeploy_server
else:
raise AirflowException('Unable to retrieve information of '
'node to be redeployed!')
# Retrieve Deckhand Design Reference
self.design_ref = self.get_deckhand_design_ref(context)
if self.design_ref:
logging.info("Drydock YAMLs will be retrieved from %s",
self.design_ref)
else:
raise AirflowException("Unable to Retrieve Design Reference!")
# Drydock Validate Site Design
if self.action == 'validate_site_design':
# Initialize variable
site_design_validity = 'invalid'
# Retrieve Endpoint Information
svc_type = 'physicalprovisioner'
drydock_svc_endpoint = ucp_service_endpoint(self,
svc_type=svc_type)
site_design_validity = self.drydock_validate_design(
drydock_svc_endpoint)
return site_design_validity
# DrydockClient
# Retrieve Endpoint Information
svc_type = 'physicalprovisioner'
drydock_svc_endpoint = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("DryDock endpoint is %s", drydock_svc_endpoint)
# Set up DryDock Client
drydock_client = self.drydock_session_client(drydock_svc_endpoint)
# Create Task for verify_site
if self.action == 'verify_site':
q_interval = self.dc['physical_provisioner.verify_interval']
task_timeout = self.dc['physical_provisioner.verify_timeout']
self.drydock_action(drydock_client, context, self.action,
q_interval, task_timeout)
# Create Task for prepare_site
elif self.action == 'prepare_site':
q_interval = self.dc['physical_provisioner.prepare_site_interval']
task_timeout = self.dc['physical_provisioner.prepare_site_timeout']
self.drydock_action(drydock_client, context, self.action,
q_interval, task_timeout)
# Create Task for prepare_node
elif self.action == 'prepare_nodes':
q_interval = self.dc['physical_provisioner.prepare_node_interval']
task_timeout = self.dc['physical_provisioner.prepare_node_timeout']
self.drydock_action(drydock_client, context, self.action,
q_interval, task_timeout)
# Create Task for deploy_node
elif self.action == 'deploy_nodes':
q_interval = self.dc['physical_provisioner.deploy_interval']
task_timeout = self.dc['physical_provisioner.deploy_timeout']
self.drydock_action(drydock_client, context, self.action,
q_interval, task_timeout)
# Wait for 120 seconds (default value) before checking the cluster
# join process as it takes time for process to be triggered across
# all nodes
join_wait = self.dc['physical_provisioner.join_wait']
logging.info("All nodes deployed in MAAS")
logging.info("Wait for %d seconds before checking node state...",
join_wait)
time.sleep(join_wait)
# Check that cluster join process is completed before declaring
# deploy_node as 'completed'.
node_st_timeout = self.dc['kubernetes.node_status_timeout']
node_st_interval = self.dc['kubernetes.node_status_interval']
check_node_status(node_st_timeout, node_st_interval)
# Create Task for destroy_node
# NOTE: This is a PlaceHolder function. The 'destroy_node'
# functionalities in DryDock is being worked on and is not
# ready at the moment.
elif self.action == 'destroy_node':
# see deployment_configuration_operator.py for defaults
q_interval = self.dc['physical_provisioner.destroy_interval']
task_timeout = self.dc['physical_provisioner.destroy_timeout']
logging.info("Destroying node %s from cluster...", redeploy_server)
time.sleep(15)
logging.info("Successfully deleted node %s", redeploy_server)
# TODO: Uncomment when the function to destroy/delete node is
# ready for consumption in Drydock
# self.drydock_action(drydock_client, context, self.action,
# q_interval, task_timeout)
# Do not perform any action
else:
logging.info('No Action to Perform')
@shipyard_service_token
def _auth_gen(self):
# Generator method for the Drydock Session to use to get the
# auth headers necessary
return [('X-Auth-Token', self.svc_token)]
def drydock_session_client(self, drydock_svc_endpoint):
# Initialize Variables
drydock_url = None
dd_session = None
dd_client = None
# Parse DryDock Service Endpoint
drydock_url = urlparse(drydock_svc_endpoint)
# Build a DrydockSession with credentials and target host
# information.
logging.info("Build DryDock Session")
dd_session = session.DrydockSession(drydock_url.hostname,
port=drydock_url.port,
auth_gen=self._auth_gen)
# Raise Exception if we are not able to get a drydock session
if dd_session:
logging.info("Successfully Set Up DryDock Session")
else:
raise AirflowException("Failed to set up Drydock Session!")
# Use session to build a DrydockClient to make one or more API calls
# The DrydockSession will care for TCP connection pooling
# and header management
logging.info("Create DryDock Client")
dd_client = client.DrydockClient(dd_session)
# Raise Exception if we are not able to build drydock client
if dd_client:
logging.info("Successfully Set Up DryDock client")
else:
raise AirflowException("Unable to set up Drydock Client!")
# Drydock client for XCOM Usage
return dd_client
def drydock_action(self, drydock_client, context, action, interval,
time_out):
# Trigger DryDock to execute task and retrieve task ID
task_id = self.drydock_perform_task(drydock_client, context,
action, self.node_filter)
logging.info('Task ID is %s', task_id)
# Query Task
self.drydock_query_task(drydock_client, context, interval,
time_out, task_id)
def drydock_perform_task(self, drydock_client, context,
perform_task, nodes_filter):
# Initialize Variables
create_task_response = {}
task_id = None
# Node Filter
logging.info("Nodes Filter List: %s", nodes_filter)
# Create Task
create_task_response = drydock_client.create_task(
design_ref=self.design_ref,
task_action=perform_task,
node_filter=nodes_filter)
# Retrieve Task ID
task_id = create_task_response.get('task_id')
logging.info('Drydock %s task ID is %s', perform_task, task_id)
# Raise Exception if we are not able to get the task_id from
# drydock
if task_id:
return task_id
else:
raise AirflowException("Unable to create task!")
def drydock_query_task(self, drydock_client, context, interval,
time_out, task_id):
# Initialize Variables
keystone_token_expired = False
new_dd_client = None
dd_client = drydock_client
# Calculate number of times to execute the 'for' loop
# Convert 'time_out' and 'interval' from string into integer
# The result from the division will be a floating number which
# We will round off to nearest whole number
end_range = round(int(time_out) / int(interval))
# Query task status
for i in range(0, end_range + 1):
if keystone_token_expired:
logging.info("Established new drydock session")
dd_client = new_dd_client
try:
# Retrieve current task state
task_state = dd_client.get_task(task_id=task_id)
task_status = task_state.get('status')
task_result = task_state.get('result')['status']
logging.info("Current status of task id %s is %s",
task_id, task_status)
keystone_token_expired = False
except errors.ClientUnauthorizedError as unauthorized_error:
# TODO: This is a temporary workaround. Drydock will be
# updated with the appropriate fix in the drydock api
# client by having the session detect a 401/403 response
# and refresh the token appropriately.
# Logs drydock client unauthorized error
keystone_token_expired = True
logging.error(unauthorized_error)
# Set up new drydock client with new keystone token
logging.info("Setting up new drydock session...")
drydock_svc_endpoint = ucp_service_endpoint(
self, svc_type='physicalprovisioner')
new_dd_client = self.drydock_session_client(
drydock_svc_endpoint)
except errors.ClientForbiddenError as forbidden_error:
raise AirflowException(forbidden_error)
except errors.ClientError as client_error:
raise AirflowException(client_error)
except:
# There can be instances where there are intermittent network
# issues that prevents us from retrieving the task state. We
# will want to retry in such situations.
logging.info("Unable to retrieve task state. Retrying...")
# Raise Time Out Exception
if task_status == 'running' and i == end_range:
raise AirflowException("Task Execution Timed Out!")
# Exit 'for' loop if the task is in 'complete' or 'terminated'
# state
if task_status in ['complete', 'terminated']:
logging.info('Task result is %s', task_result)
break
else:
time.sleep(int(interval))
# Get final task result
if task_result == 'success':
logging.info('Task id %s has been successfully completed',
self.task_id)
else:
raise AirflowException("Failed to execute/complete task!")
def get_deckhand_design_ref(self, context):
# Retrieve DeckHand Endpoint Information
svc_type = 'deckhand'
deckhand_svc_endpoint = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint)
committed_revision_id = self.xcom_puller.get_design_version()
# Form DeckHand Design Reference Path that we will use to retrieve
# the DryDock YAMLs
deckhand_path = "deckhand+" + deckhand_svc_endpoint
deckhand_design_ref = os.path.join(deckhand_path,
"revisions",
str(committed_revision_id),
"rendered-documents")
return deckhand_design_ref
@shipyard_service_token
def drydock_validate_design(self, drydock_svc_endpoint):
# Form Validation Endpoint
validation_endpoint = os.path.join(drydock_svc_endpoint,
'validatedesign')
logging.info("Validation Endpoint is %s", validation_endpoint)
# Define Headers and Payload
headers = {
'Content-Type': 'application/json',
'X-Auth-Token': self.svc_token
}
payload = {
'rel': "design",
'href': self.design_ref,
'type': "application/x-yaml"
}
# Requests DryDock to validate site design
logging.info("Waiting for DryDock to validate site design...")
try:
design_validate_response = requests.post(validation_endpoint,
headers=headers,
data=json.dumps(payload))
except requests.exceptions.RequestException as e:
raise AirflowException(e)
# Convert response to string
validate_site_design = design_validate_response.text
# Print response
logging.info("Retrieving DryDock validate site design response...")
logging.info(json.loads(validate_site_design))
# Check if site design is valid
if json.loads(validate_site_design).get('status') == 'Success':
logging.info("DryDock Site Design has been successfully validated")
return 'valid'
else:
raise AirflowException("DryDock Site Design Validation Failed!")
class DryDockClientPlugin(AirflowPlugin):
name = "drydock_client_plugin"
operators = [DryDockOperator]

View File

@ -0,0 +1,47 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.plugins_manager import AirflowPlugin
from drydock_base_operator import DrydockBaseOperator
class DrydockPrepareNodesOperator(DrydockBaseOperator):
"""Drydock Prepare Nodes Operator
This operator will trigger drydock to prepare nodes for
site deployment
"""
def do_execute(self):
# Trigger DryDock to execute task
self.create_task('prepare_site')
# Retrieve query interval and timeout
q_interval = self.dc['physical_provisioner.prepare_node_interval']
task_timeout = self.dc['physical_provisioner.prepare_node_timeout']
# Query Task
self.query_task(q_interval, task_timeout)
class DrydockPrepareNodesOperatorPlugin(AirflowPlugin):
"""Creates DrydockPrepareNodesOperator in Airflow."""
name = 'drydock_prepare_nodes_operator'
operators = [DrydockPrepareNodesOperator]

View File

@ -0,0 +1,47 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.plugins_manager import AirflowPlugin
from drydock_base_operator import DrydockBaseOperator
class DrydockPrepareSiteOperator(DrydockBaseOperator):
"""Drydock Prepare Site Operator
This operator will trigger drydock to prepare site for
site deployment
"""
def do_execute(self):
# Trigger DryDock to execute task
self.create_task('prepare_site')
# Retrieve query interval and timeout
q_interval = self.dc['physical_provisioner.prepare_site_interval']
task_timeout = self.dc['physical_provisioner.prepare_site_timeout']
# Query Task
self.query_task(q_interval, task_timeout)
class DrydockPrepareSiteOperatorPlugin(AirflowPlugin):
"""Creates DrydockPrepareSiteOperator in Airflow."""
name = 'drydock_prepare_site_operator'
operators = [DrydockPrepareSiteOperator]

View File

@ -0,0 +1,85 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import os
import requests
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from drydock_base_operator import DrydockBaseOperator
class DrydockValidateDesignOperator(DrydockBaseOperator):
"""Drydock Validate Design Operator
This operator will trigger drydock to validate the
site design
"""
def do_execute(self):
# Form Validation Endpoint
validation_endpoint = os.path.join(self.drydock_svc_endpoint,
'validatedesign')
logging.info("Validation Endpoint is %s", validation_endpoint)
# Define Headers and Payload
headers = {
'Content-Type': 'application/json',
'X-Auth-Token': self.svc_token
}
payload = {
'rel': "design",
'href': self.deckhand_design_ref,
'type': "application/x-yaml"
}
# Requests DryDock to validate site design
logging.info("Waiting for DryDock to validate site design...")
try:
design_validate_response = requests.post(validation_endpoint,
headers=headers,
data=json.dumps(payload))
except requests.exceptions.RequestException as e:
raise AirflowException(e)
# Convert response to string
validate_site_design = design_validate_response.text
# Print response
logging.info("Retrieving DryDock validate site design response...")
logging.info(json.loads(validate_site_design))
# Check if site design is valid
if json.loads(validate_site_design).get('status') == 'Success':
logging.info("DryDock Site Design has been successfully validated")
else:
raise AirflowException("DryDock Site Design Validation Failed!")
class DrydockValidateDesignOperatorPlugin(AirflowPlugin):
"""Creates DrydockValidateDesignOperator in Airflow."""
name = 'drydock_validate_design_operator'
operators = [DrydockValidateDesignOperator]

View File

@ -0,0 +1,46 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.plugins_manager import AirflowPlugin
from drydock_base_operator import DrydockBaseOperator
class DrydockVerifySiteOperator(DrydockBaseOperator):
"""Drydock Verify Site Operator
This operator will trigger drydock to verify site
"""
def do_execute(self):
# Trigger DryDock to execute task
self.create_task('verify_site')
# Retrieve query interval and timeout
q_interval = self.dc['physical_provisioner.verify_interval']
task_timeout = self.dc['physical_provisioner.verify_timeout']
# Query Task
self.query_task(q_interval, task_timeout)
class DrydockVerifySiteOperatorPlugin(AirflowPlugin):
"""Creates DrydockVerifySiteOperator in Airflow."""
name = 'drydock_verify_site_operator'
operators = [DrydockVerifySiteOperator]