Add UCP Base Operator

1) Refactor Drydock Base Operator to make use of the
   UCP Base Operator instead

2) Dump logs from Drydock Pods when there are Exceptions

Change-Id: I3fbe03d13b5fc89a503cfb2c3c25751076718554
This commit is contained in:
Anthony Lin 2018-04-16 10:43:42 +00:00
parent 83d91689aa
commit b9b0e27de0
12 changed files with 248 additions and 100 deletions

View File

@ -389,6 +389,8 @@ conf:
airflow:
worker_endpoint_scheme: 'http'
worker_port: 8793
k8s_logs:
ucp_namespace: 'ucp'
airflow_config_file:
path: /usr/local/airflow/airflow.cfg
airflow:

View File

@ -249,6 +249,16 @@
#auth_section = <None>
[k8s_logs]
#
# From shipyard_airflow
#
# The namespace of the UCP Pods (string value)
#ucp_namespace = ucp
[logging]
#

View File

@ -202,6 +202,17 @@ SECTIONS = [
),
]
),
ConfigSection(
name='k8s_logs',
title='Parameters for K8s Pods Logs',
options=[
cfg.StrOpt(
'ucp_namespace',
default='ucp',
help='Namespace of UCP Pods'
),
]
),
]

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import os
@ -19,7 +18,6 @@ import time
from urllib.parse import urlparse
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
@ -28,10 +26,12 @@ import drydock_provisioner.drydock_client.session as session
from drydock_provisioner import error as errors
from service_endpoint import ucp_service_endpoint
from service_token import shipyard_service_token
from xcom_puller import XcomPuller
from ucp_base_operator import UcpBaseOperator
LOG = logging.getLogger(__name__)
class DrydockBaseOperator(BaseOperator):
class DrydockBaseOperator(UcpBaseOperator):
"""Drydock Base Operator
@ -49,14 +49,10 @@ class DrydockBaseOperator(BaseOperator):
drydock_svc_endpoint=None,
drydock_svc_type='physicalprovisioner',
drydock_task_id=None,
main_dag_name=None,
node_filter=None,
redeploy_server=None,
shipyard_conf=None,
sub_dag_name=None,
svc_session=None,
svc_token=None,
xcom_push=True,
*args, **kwargs):
"""Initialization of DrydockBaseOperator object.
@ -66,62 +62,39 @@ class DrydockBaseOperator(BaseOperator):
:param drydock_svc_endpoint: Drydock Service Endpoint
:param drydock_svc_type: Drydock Service Type
:param drydock_task_id: Drydock Task ID
:param main_dag_name: Parent Dag
:param node_filter: A filter for narrowing the scope of the task.
Valid fields are 'node_names', 'rack_names',
'node_tags'. Note that node filter is turned
off by default, i.e. all nodes will be deployed.
:param redeploy_server: Server to be redeployed
:param shipyard_conf: Location of shipyard.conf
:param sub_dag_name: Child Dag
:param svc_session: Keystone Session
:param svc_token: Keystone Token
:param xcom_push: xcom usage
The Drydock operator assumes that prior steps have set xcoms for
the action and the deployment configuration
"""
super(DrydockBaseOperator, self).__init__(*args, **kwargs)
super(DrydockBaseOperator,
self).__init__(
pod_selector_pattern=[{'pod_pattern': 'drydock-api',
'container': 'drydock-api'}],
*args, **kwargs)
self.deckhand_design_ref = deckhand_design_ref
self.deckhand_svc_type = deckhand_svc_type
self.drydock_client = drydock_client
self.drydock_svc_endpoint = drydock_svc_endpoint
self.drydock_svc_type = drydock_svc_type
self.drydock_task_id = drydock_task_id
self.main_dag_name = main_dag_name
self.node_filter = node_filter
self.redeploy_server = redeploy_server
self.shipyard_conf = shipyard_conf
self.sub_dag_name = sub_dag_name
self.svc_session = svc_session
self.svc_token = svc_token
self.xcom_push_flag = xcom_push
def execute(self, context):
# Execute drydock base function
self.drydock_base(context)
# Exeute child function
self.do_execute()
def drydock_base(self, context):
# Initialize Variables
drydock_url = None
dd_session = None
# Define task_instance
task_instance = context['task_instance']
# Set up and retrieve values from xcom
self.xcom_puller = XcomPuller(self.main_dag_name, task_instance)
self.action_info = self.xcom_puller.get_action_info()
self.dc = self.xcom_puller.get_deployment_configuration()
def run_base(self, context):
# Logs uuid of action performed by the Operator
logging.info("DryDock Operator for action %s", self.action_info['id'])
LOG.info("DryDock Operator for action %s", self.action_info['id'])
# Retrieve information of the server that we want to redeploy if user
# executes the 'redeploy_server' dag
@ -131,18 +104,19 @@ class DrydockBaseOperator(BaseOperator):
self.action_info['parameters']['server-name'])
if self.redeploy_server:
logging.info("Server to be redeployed is %s",
self.redeploy_server)
LOG.info("Server to be redeployed is %s",
self.redeploy_server)
self.node_filter = self.redeploy_server
else:
raise AirflowException('Unable to retrieve information of '
'node to be redeployed!')
raise AirflowException('%s was unable to retrieve the '
'server to be redeployed.'
% self.__class__.__name__)
# Retrieve Endpoint Information
self.drydock_svc_endpoint = ucp_service_endpoint(
self, svc_type=self.drydock_svc_type)
logging.info("Drydock endpoint is %s", self.drydock_svc_endpoint)
LOG.info("Drydock endpoint is %s", self.drydock_svc_endpoint)
# Parse DryDock Service Endpoint
drydock_url = urlparse(self.drydock_svc_endpoint)
@ -151,25 +125,25 @@ class DrydockBaseOperator(BaseOperator):
# information.
# The DrydockSession will care for TCP connection pooling
# and header management
logging.info("Build DryDock Session")
LOG.info("Build DryDock Session")
dd_session = session.DrydockSession(drydock_url.hostname,
port=drydock_url.port,
auth_gen=self._auth_gen)
# Raise Exception if we are not able to set up the session
if dd_session:
logging.info("Successfully Set Up DryDock Session")
LOG.info("Successfully Set Up DryDock Session")
else:
raise AirflowException("Failed to set up Drydock Session!")
# Use the DrydockSession to build a DrydockClient that can
# be used to make one or more API calls
logging.info("Create DryDock Client")
LOG.info("Create DryDock Client")
self.drydock_client = client.DrydockClient(dd_session)
# Raise Exception if we are not able to build the client
if self.drydock_client:
logging.info("Successfully Set Up DryDock client")
LOG.info("Successfully Set Up DryDock client")
else:
raise AirflowException("Failed to set up Drydock Client!")
@ -177,7 +151,7 @@ class DrydockBaseOperator(BaseOperator):
deckhand_svc_endpoint = ucp_service_endpoint(
self, svc_type=self.deckhand_svc_type)
logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint)
LOG.info("Deckhand endpoint is %s", deckhand_svc_endpoint)
# Retrieve last committed revision id
committed_revision_id = self.xcom_puller.get_design_version()
@ -190,8 +164,8 @@ class DrydockBaseOperator(BaseOperator):
str(committed_revision_id),
"rendered-documents")
if self.deckhand_design_ref:
logging.info("Design YAMLs will be retrieved from %s",
self.deckhand_design_ref)
LOG.info("Design YAMLs will be retrieved from %s",
self.deckhand_design_ref)
else:
raise AirflowException("Unable to Retrieve Design Reference!")
@ -207,7 +181,7 @@ class DrydockBaseOperator(BaseOperator):
create_task_response = {}
# Node Filter
logging.info("Nodes Filter List: %s", self.node_filter)
LOG.info("Nodes Filter List: %s", self.node_filter)
try:
# Create Task
@ -217,12 +191,15 @@ class DrydockBaseOperator(BaseOperator):
node_filter=self.node_filter)
except errors.ClientError as client_error:
# Dump logs from Drydock pods
self.get_k8s_logs()
raise AirflowException(client_error)
# Retrieve Task ID
self.drydock_task_id = create_task_response['task_id']
logging.info('Drydock %s task ID is %s',
task_action, self.drydock_task_id)
LOG.info('Drydock %s task ID is %s',
task_action, self.drydock_task_id)
# Raise Exception if we are not able to get the task_id from
# Drydock
@ -239,7 +216,7 @@ class DrydockBaseOperator(BaseOperator):
# We will round off to nearest whole number
end_range = round(int(time_out) / int(interval))
logging.info('Task ID is %s', self.drydock_task_id)
LOG.info('Task ID is %s', self.drydock_task_id)
# Query task status
for i in range(0, end_range + 1):
@ -251,17 +228,20 @@ class DrydockBaseOperator(BaseOperator):
task_status = task_state['status']
task_result = task_state['result']['status']
logging.info("Current status of task id %s is %s",
self.drydock_task_id, task_status)
LOG.info("Current status of task id %s is %s",
self.drydock_task_id, task_status)
except errors.ClientError as client_error:
# Dump logs from Drydock pods
self.get_k8s_logs()
raise AirflowException(client_error)
except:
# There can be situations where there are intermittent network
# issues that prevents us from retrieving the task state. We
# will want to retry in such situations.
logging.warning("Unable to retrieve task state. Retrying...")
LOG.warning("Unable to retrieve task state. Retrying...")
# Raise Time Out Exception
if task_status == 'running' and i == end_range:
@ -270,21 +250,23 @@ class DrydockBaseOperator(BaseOperator):
# Exit 'for' loop if the task is in 'complete' or 'terminated'
# state
if task_status in ['complete', 'terminated']:
logging.info('Task result is %s', task_result)
LOG.info('Task result is %s', task_result)
break
else:
time.sleep(int(interval))
# Get final task result
if task_result == 'success':
logging.info('Task id %s has been successfully completed',
self.drydock_task_id)
LOG.info('Task id %s has been successfully completed',
self.drydock_task_id)
else:
self.task_failure(True)
def task_failure(self, _task_failure):
# Dump logs from Drydock pods
self.get_k8s_logs()
logging.info('Retrieving all tasks records from Drydock...')
LOG.info('Retrieving all tasks records from Drydock...')
try:
# Get all tasks records
@ -304,39 +286,38 @@ class DrydockBaseOperator(BaseOperator):
# Since there is only 1 failed parent task, we will print index 0
# of the list
if failed_task:
logging.error('%s task has either failed or timed out',
failed_task[0]['action'])
LOG.error('%s task has either failed or timed out',
failed_task[0]['action'])
logging.error(json.dumps(failed_task[0],
indent=4,
sort_keys=True))
LOG.error(json.dumps(failed_task[0],
indent=4,
sort_keys=True))
# Get the list of subtasks belonging to the failed parent task
subtask_id_list = failed_task[0]['subtask_id_list']
logging.info("Printing information of failed sub-tasks...")
LOG.info("Printing information of failed sub-tasks...")
# Print detailed information of failed step(s) under each subtask
# This will help to provide additional information for troubleshooting
# purpose.
for subtask_id in subtask_id_list:
logging.info("Retrieving details of subtask %s...",
subtask_id)
LOG.info("Retrieving details of subtask %s...", subtask_id)
# Retrieve task information
task = all_task_ids.get(subtask_id)
if task:
# Print subtask action and state
logging.info("%s subtask is in %s state",
task['action'],
task['result']['status'])
LOG.info("%s subtask is in %s state",
task['action'],
task['result']['status'])
# Print list containing steps in failure state
if task['result']['failures']:
logging.error("The following steps have failed:")
logging.error(task['result']['failures'])
LOG.error("The following steps have failed:")
LOG.error(task['result']['failures'])
message_list = (
task['result']['details']['messageList'] or [])
@ -346,12 +327,12 @@ class DrydockBaseOperator(BaseOperator):
is_error = message['error'] is True
if is_error:
logging.error(json.dumps(message,
indent=4,
sort_keys=True))
LOG.error(json.dumps(message,
indent=4,
sort_keys=True))
else:
logging.info("No failed step detected for subtask %s",
subtask_id)
LOG.info("No failed step detected for subtask %s",
subtask_id)
else:
raise AirflowException("Unable to retrieve subtask info!")

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
@ -20,6 +19,8 @@ from airflow.plugins_manager import AirflowPlugin
from check_k8s_node_status import check_node_status
from drydock_base_operator import DrydockBaseOperator
LOG = logging.getLogger(__name__)
class DrydockDeployNodesOperator(DrydockBaseOperator):
@ -47,9 +48,9 @@ class DrydockDeployNodesOperator(DrydockBaseOperator):
# and wait before checking the state of the cluster join process.
join_wait = self.dc['physical_provisioner.join_wait']
logging.info("All nodes deployed in MAAS")
logging.info("Wait for %d seconds before checking node state...",
join_wait)
LOG.info("All nodes deployed in MAAS")
LOG.info("Wait for %d seconds before checking node state...",
join_wait)
time.sleep(join_wait)

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
@ -19,6 +18,8 @@ from airflow.plugins_manager import AirflowPlugin
from drydock_base_operator import DrydockBaseOperator
LOG = logging.getLogger(__name__)
class DrydockDestroyNodeOperator(DrydockBaseOperator):
@ -31,17 +32,13 @@ class DrydockDestroyNodeOperator(DrydockBaseOperator):
def do_execute(self):
# Retrieve query interval and timeout
q_interval = self.dc['physical_provisioner.destroy_interval']
task_timeout = self.dc['physical_provisioner.destroy_timeout']
# NOTE: This is a PlaceHolder function. The 'destroy_node'
# functionalities in DryDock is being worked on and is not
# ready at the moment.
logging.info("Destroying node %s from cluster...",
self.redeploy_server)
LOG.info("Destroying node %s from cluster...",
self.redeploy_server)
time.sleep(15)
logging.info("Successfully deleted node %s", self.redeploy_server)
LOG.info("Successfully deleted node %s", self.redeploy_server)
class DrydockDestroyNodeOperatorPlugin(AirflowPlugin):

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.plugins_manager import AirflowPlugin
from drydock_base_operator import DrydockBaseOperator

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.plugins_manager import AirflowPlugin
from drydock_base_operator import DrydockBaseOperator

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import os
@ -22,6 +21,8 @@ from airflow.exceptions import AirflowException
from drydock_base_operator import DrydockBaseOperator
LOG = logging.getLogger(__name__)
class DrydockValidateDesignOperator(DrydockBaseOperator):
@ -38,7 +39,7 @@ class DrydockValidateDesignOperator(DrydockBaseOperator):
validation_endpoint = os.path.join(self.drydock_svc_endpoint,
'validatedesign')
logging.info("Validation Endpoint is %s", validation_endpoint)
LOG.info("Validation Endpoint is %s", validation_endpoint)
# Define Headers and Payload
headers = {
@ -53,7 +54,7 @@ class DrydockValidateDesignOperator(DrydockBaseOperator):
}
# Requests DryDock to validate site design
logging.info("Waiting for DryDock to validate site design...")
LOG.info("Waiting for DryDock to validate site design...")
try:
design_validate_response = requests.post(validation_endpoint,
@ -67,15 +68,18 @@ class DrydockValidateDesignOperator(DrydockBaseOperator):
validate_site_design = design_validate_response.text
# Print response
logging.info("Retrieving DryDock validate site design response...")
logging.info(json.loads(validate_site_design))
LOG.info("Retrieving DryDock validate site design response...")
LOG.info(json.loads(validate_site_design))
# Check if site design is valid
status = str(json.loads(validate_site_design).get('status',
'unspecified'))
if status.lower() == 'success':
logging.info("DryDock Site Design has been successfully validated")
LOG.info("DryDock Site Design has been successfully validated")
else:
# Dump logs from Drydock pods
self.get_k8s_logs()
raise AirflowException("DryDock Site Design Validation Failed "
"with status: {}!".format(status))

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.plugins_manager import AirflowPlugin
from drydock_base_operator import DrydockBaseOperator

View File

@ -0,0 +1,143 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import configparser
import logging
import math
from datetime import datetime
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from get_k8s_logs import get_pod_logs
from get_k8s_logs import K8sLoggingException
from xcom_puller import XcomPuller
LOG = logging.getLogger(__name__)
class UcpBaseOperator(BaseOperator):
"""UCP Base Operator
All UCP related workflow operators will use the UCP base
operator as the parent and inherit attributes and methods
from this class
"""
@apply_defaults
def __init__(self,
main_dag_name=None,
pod_selector_pattern=None,
shipyard_conf=None,
start_time=None,
sub_dag_name=None,
xcom_push=True,
*args, **kwargs):
"""Initialization of UcpBaseOperator object.
:param main_dag_name: Parent Dag
:param pod_selector_pattern: A list containing the information on
the patterns of the Pod name and name
of the associated container for log
queries. This will allow us to query
multiple components, e.g. MAAS and
Drydock at the same time. It also allows
us to query the logs of specific container
in Pods with multiple containers. For
instance the Airflow worker pod contains
both the airflow-worker container and the
log-rotate container.
:param shipyard_conf: Location of shipyard.conf
:param start_time: Time when Operator gets executed
:param sub_dag_name: Child Dag
:param xcom_push: xcom usage
"""
super(UcpBaseOperator, self).__init__(*args, **kwargs)
self.main_dag_name = main_dag_name
self.pod_selector_pattern = pod_selector_pattern or []
self.shipyard_conf = shipyard_conf
self.start_time = datetime.now()
self.sub_dag_name = sub_dag_name
self.xcom_push_flag = xcom_push
def execute(self, context):
# Execute UCP base function
self.ucp_base(context)
# Execute base function
self.run_base(context)
# Exeute child function
self.do_execute()
def ucp_base(self, context):
LOG.info("Running UCP Base Operator...")
# Read and parse shiyard.conf
config = configparser.ConfigParser()
config.read(self.shipyard_conf)
# Initialize variable
self.ucp_namespace = config.get('k8s_logs', 'ucp_namespace')
# Define task_instance
task_instance = context['task_instance']
# Set up and retrieve values from xcom
self.xcom_puller = XcomPuller(self.main_dag_name, task_instance)
self.action_info = self.xcom_puller.get_action_info()
self.dc = self.xcom_puller.get_deployment_configuration()
def get_k8s_logs(self):
"""Retrieve Kubernetes pod/container logs specified by an opererator
This method is "best effort" and should not prevent the progress of
the workflow processing
"""
if self.pod_selector_pattern:
for selector in self.pod_selector_pattern:
# Get difference in current time and time when the
# operator was first executed (in seconds)
t_diff = (datetime.now() - self.start_time).total_seconds()
# Note that we will end up with a floating number for
# 't_diff' and will need to round it up to the nearest
# integer
t_diff_int = int(math.ceil(t_diff))
try:
get_pod_logs(selector['pod_pattern'],
self.ucp_namespace,
selector['container'],
t_diff_int)
except K8sLoggingException as e:
LOG.error(e)
else:
LOG.debug("There are no pod logs specified to retrieve")
class UcpBaseOperatorPlugin(AirflowPlugin):
"""Creates UcpBaseOperator in Airflow."""
name = 'ucp_base_operator_plugin'
operators = [UcpBaseOperator]

View File

@ -40,6 +40,8 @@ project_domain_name = default
project_name = service
user_domain_name = default
username = shipyard
[k8s_logs]
ucp_namespace = ucp
[requests_config]
airflow_log_connect_timeout = 5
airflow_log_read_timeout = 300