drydock/drydock_provisioner/orchestrator/orchestrator.py

497 lines
20 KiB
Python

# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Workflow orchestrator for Drydock tasks."""
import time
import importlib
import logging
import uuid
import concurrent.futures
import drydock_provisioner.config as config
import drydock_provisioner.objects as objects
import drydock_provisioner.error as errors
import drydock_provisioner.objects.fields as hd_fields
from .actions.orchestrator import Noop
from .actions.orchestrator import ValidateDesign
from .actions.orchestrator import VerifySite
from .actions.orchestrator import PrepareSite
from .actions.orchestrator import VerifyNodes
from .actions.orchestrator import PrepareNodes
from .actions.orchestrator import DeployNodes
from .actions.orchestrator import DestroyNodes
class Orchestrator(object):
"""Defines functionality for task execution workflow."""
def __init__(self, enabled_drivers=None, state_manager=None,
ingester=None):
"""Initialize the orchestrator. A single instance should be executing at a time.
:param enabled_drivers: a dictionary of drivers to enable for executing downstream tasks
:param state_manager: the instance of statemgr.state.DrydockState to use for accessign app state
:param ingester: instance of ingester.Ingester used to process design documents
"""
self.orch_id = uuid.uuid4()
self.stop_flag = False
self.enabled_drivers = {}
self.state_manager = state_manager
self.ingester = ingester
if self.state_manager is None or self.ingester is None:
raise errors.OrchestratorError(
"Orchestrator requires instantiated state manager and ingester."
)
self.logger = logging.getLogger('drydock.orchestrator')
if enabled_drivers is not None:
oob_drivers = enabled_drivers.oob_driver
# This is because oslo_config changes the option value
# for multiopt depending on if multiple values are actually defined
for d in oob_drivers:
self.logger.info("Enabling OOB driver %s" % d)
if d is not None:
m, c = d.rsplit('.', 1)
oob_driver_class = \
getattr(importlib.import_module(m), c, None)
if oob_driver_class is not None:
if self.enabled_drivers.get('oob', None) is None:
self.enabled_drivers['oob'] = []
self.enabled_drivers['oob'].append(
oob_driver_class(
state_manager=state_manager,
orchestrator=self))
node_driver_name = enabled_drivers.node_driver
if node_driver_name is not None:
m, c = node_driver_name.rsplit('.', 1)
node_driver_class = \
getattr(importlib.import_module(m), c, None)
if node_driver_class is not None:
self.enabled_drivers['node'] = node_driver_class(
state_manager=state_manager, orchestrator=self)
network_driver_name = enabled_drivers.network_driver
if network_driver_name is not None:
m, c = network_driver_name.rsplit('.', 1)
network_driver_class = getattr(
importlib.import_module(m), c, None)
if network_driver_class is not None:
self.enabled_drivers['network'] = network_driver_class(
state_manager=state_manager, orchestrator=self)
def watch_for_tasks(self):
"""Start polling the database watching for Queued tasks to execute."""
orch_task_actions = {
hd_fields.OrchestratorAction.Noop: Noop,
hd_fields.OrchestratorAction.ValidateDesign: ValidateDesign,
hd_fields.OrchestratorAction.VerifySite: VerifySite,
hd_fields.OrchestratorAction.PrepareSite: PrepareSite,
hd_fields.OrchestratorAction.VerifyNodes: VerifyNodes,
hd_fields.OrchestratorAction.PrepareNodes: PrepareNodes,
hd_fields.OrchestratorAction.DeployNodes: DeployNodes,
hd_fields.OrchestratorAction.DestroyNodes: DestroyNodes,
}
# Loop trying to claim status as the active orchestrator
tp = concurrent.futures.ThreadPoolExecutor(max_workers=16)
while True:
if self.stop_flag:
tp.shutdown()
return
claim = self.state_manager.claim_leadership(self.orch_id)
if not claim:
self.logger.info(
"Orchestrator %s denied leadership, sleeping to try again."
% str(self.orch_id))
# TODO(sh8121att) Make this configurable
time.sleep(300)
else:
self.logger.info(
"Orchestrator %s successfully claimed leadership, polling for tasks."
% str(self.orch_id))
# As active orchestrator, loop looking for queued tasks.
task_future = None
while True:
# TODO(sh8121att) Need a timeout here
if self.stop_flag:
tp.shutdown()
self.state_manager.abdicate_leadership(self.orch_id)
return
if task_future is not None:
if task_future.done():
self.logger.debug(
"Task execution complete, looking for the next task."
)
exc = task_future.exception()
if exc is not None:
self.logger.error(
"Error in starting orchestrator action.",
exc_info=exc)
task_future = None
if task_future is None:
next_task = self.state_manager.get_next_queued_task(
allowed_actions=list(orch_task_actions.keys()))
if next_task is not None:
self.logger.info(
"Found task %s queued, starting execution." %
str(next_task.get_id()))
if next_task.check_terminate():
self.logger.info(
"Task %s marked for termination, skipping execution."
% str(next_task.get_id()))
next_task.set_status(
hd_fields.TaskStatus.Terminated)
next_task.save()
continue
action = orch_task_actions[next_task.action](
next_task, self, self.state_manager)
if action:
task_future = tp.submit(action.start)
else:
self.logger.warning(
"Task %s has unsupported action %s, ending execution."
% (str(next_task.get_id()),
next_task.action))
next_task.add_status_msg(
msg="Unsupported action %s." %
next_task.action,
error=True,
ctx=str(next_task.get_id()),
ctx_type='task')
next_task.failure()
next_task.set_status(
hd_fields.TaskStatus.Complete)
next_task.save()
else:
self.logger.info(
"No task found, waiting to poll again.")
# TODO(sh8121att) Make this configurable
time.sleep(config.config_mgr.conf.poll_interval)
claim = self.state_manager.maintain_leadership(
self.orch_id)
if not claim:
self.logger.info(
"Orchestrator %s lost leadership, attempting to reclaim."
% str(self.orch_id))
break
def stop_orchestrator(self):
"""Indicate this orchestrator instance should stop attempting to run."""
self.stop_flag = True
def terminate_task(self, task, propagate=True, terminated_by=None):
"""Mark a task for termination.
Optionally propagate the termination recursively to all subtasks
:param task: A objects.Task instance to terminate
:param propagate: whether the termination should propagatge to subtasks
"""
if task is None:
raise errors.OrchestratorError(
"Could find task %s" % str(task.get_id()))
else:
# Terminate initial task first to prevent add'l subtasks
self.logger.debug("Terminating task %s." % str(task.get_id()))
task.terminate_task(terminated_by=terminated_by)
if propagate:
# Get subtasks list
subtasks = task.get_subtasks()
for st_id in subtasks:
st = self.state_manager.get_task(st_id)
self.terminate_task(
st, propagate=True, terminated_by=terminated_by)
def create_task(self, **kwargs):
"""Create a new task and persist it."""
new_task = objects.Task(statemgr=self.state_manager, **kwargs)
self.state_manager.post_task(new_task)
return new_task
def compute_model_inheritance(self, site_design):
"""Compute inheritance of the design model.
Given a fully populated Site model, compute the effecitve
design by applying inheritance and references
"""
for n in getattr(site_design, 'baremetal_nodes', []):
n.compile_applied_model(site_design)
return
def get_described_site(self, design_ref):
"""Ingest design data referenced by design_ref.
Return a tuple of the processing status and the populated instance
of SiteDesign
:param design_ref: Supported URI referencing a design document
"""
status, site_design = self.ingester.ingest_data(
design_ref=design_ref, design_state=self.state_manager)
return status, site_design
def _validate_design(self, site_design, result_status=None):
"""Validate the design in site_design passes all validation rules.
Apply all validation rules to the design in site_design. If result_status is
defined, update it with validation messages. Otherwise a new status instance
will be created and returned.
:param site_design: instance of objects.SiteDesign
:param result_status: instance of objects.TaskStatus
"""
# TODO(sh8121att) actually implement the validation rules defined in the readme
if result_status is not None:
result_status = objects.TaskStatus()
result_status.set_status(hd_fields.ActionResult.Success)
return result_status
def get_effective_site(self, design_ref):
"""Ingest design data and compile the effective model of the design.
Return a tuple of the processing status and the populated instance
of SiteDesign after computing the inheritance chain
:param design_ref: Supported URI referencing a design document
"""
status = None
site_design = None
try:
status, site_design = self.get_described_site(design_ref)
if status.status == hd_fields.ActionResult.Success:
self.compute_model_inheritance(site_design)
status = self._validate_design(site_design, result_status=status)
except Exception as ex:
if status is not None:
status.add_status_msg(
"Error loading effective site: %s" % str(ex),
error=True,
ctx='NA',
ctx_type='NA')
status.set_status(hd_fields.ActionResult.Failure)
else:
self.logger.error(
"Error getting site definition: %s" % str(ex), exc_info=ex)
else:
status.add_status_msg(
msg="Successfully computed effective design.",
error=False,
ctx_type='NA',
ctx='NA')
status.set_status(hd_fields.ActionResult.Success)
return status, site_design
def get_target_nodes(self, task, failures=False, successes=False):
"""Compute list of target nodes for given ``task``.
If failures is true, then create a node_filter based on task result
failures. If successes is true, then create a node_filter based on
task result successes. If both are true, raise an exception. If neither
are true, build the list from the task node_filter.
:param task: instance of objects.Task
:param failures: whether to build target list from previous task failures
:param successes: whether to build target list from previous task successes
"""
design_status, site_design = self.get_effective_site(task.design_ref)
if design_status.status != hd_fields.ActionResult.Success:
raise errors.OrchestratorError(
"Unable to render effective site design.")
if failures and successes:
raise errors.OrchestratorError(
"Cannot specify both failures and successes.")
if failures:
if len(task.result.failures) == 0:
return []
nf = task.node_filter_from_failures()
elif successes:
if len(task.result.successes) == 0:
return []
nf = task.node_filter_from_sucessess()
else:
nf = task.node_filter
node_list = self.process_node_filter(nf, site_design)
return node_list
def create_nodefilter_from_nodelist(self, node_list):
"""Create a node filter to match list of nodes.
Returns a dictionary that will be properly processed by the orchestrator
:param node_list: List of objects.BaremetalNode instances the filter should match
"""
nf = dict()
nf['filter_set_type'] = 'intersection'
nf['filter_set'] = [
dict(
node_names=[x.get_id() for x in node_list],
filter_type='union')
]
return nf
def process_node_filter(self, node_filter, site_design):
target_nodes = site_design.baremetal_nodes
if node_filter is None:
return target_nodes
if not isinstance(node_filter, dict):
msg = "Invalid node_filter, must be a dictionary with keys 'filter_set_type' and 'filter_set'."
self.logger.error(msg)
raise errors.OrchestratorError(msg)
result_sets = []
for f in node_filter.get('filter_set', []):
result_sets.append(self.process_filter(target_nodes, f))
return self.join_filter_sets(
node_filter.get('filter_set_type'), result_sets)
def join_filter_sets(self, filter_set_type, result_sets):
if filter_set_type == 'union':
return self.list_union(*result_sets)
elif filter_set_type == 'intersection':
return self.list_intersection(*result_sets)
else:
raise errors.OrchestratorError(
"Unknow filter set type %s" % filter_set_type)
def process_filter(self, node_set, filter_set):
"""Take a filter and apply it to the node_set.
:param node_set: A full set of objects.BaremetalNode
:param filter_set: A filter set describing filters to apply to the node set
"""
try:
set_type = filter_set.get('filter_type', None)
node_names = filter_set.get('node_names', [])
node_tags = filter_set.get('node_tags', [])
node_labels = filter_set.get('node_labels', {})
rack_names = filter_set.get('rack_names', [])
rack_labels = filter_set.get('rack_labels', {})
target_nodes = dict()
if len(node_names) > 0:
self.logger.debug("Filtering nodes based on node names.")
target_nodes['node_names'] = [
x for x in node_set if x.get_name() in node_names
]
if len(node_tags) > 0:
self.logger.debug("Filtering nodes based on node tags.")
target_nodes['node_tags'] = [
x for x in node_set for t in node_tags if x.has_tag(t)
]
if len(rack_names) > 0:
self.logger.debug("Filtering nodes based on rack names.")
target_nodes['rack_names'] = [
x for x in node_set if x.get_rack() in rack_names
]
if len(node_labels) > 0:
self.logger.debug("Filtering nodes based on node labels.")
target_nodes['node_labels'] = []
for k, v in node_labels.items():
target_nodes['node_labels'].extend([
x for x in node_set
if getattr(x, 'owner_data', {}).get(k, None) == v
])
if len(rack_labels) > 0:
self.logger.info(
"Rack label filtering not yet implemented, returning all nodes."
)
target_nodes['rack_labels'] = node_set
if set_type == 'union':
result_set = self.list_union(
target_nodes.get('node_names', []),
target_nodes.get('node_tags', []),
target_nodes.get('rack_names', []),
target_nodes.get('node_labels', []))
elif set_type == 'intersection':
result_set = self.list_intersection(
target_nodes.get('node_names', []),
target_nodes.get('node_tags', []),
target_nodes.get('rack_names', []),
target_nodes.get('node_labels', []))
return result_set
except Exception as ex:
raise errors.OrchestratorError(
"Error processing node filter: %s" % str(ex))
def list_intersection(self, a, *rest):
"""Take the intersection of a with the intersection of all the rest.
:param a: list of values
:params rest: 0 or more lists of values
"""
if len(rest) > 1:
return list(
set(a).intersection(
set(Orchestrator.list_intersection(rest[0], rest[1:]))))
elif len(rest) == 1:
return list(set(a).intersection(set(rest[0])))
else:
return a
def list_union(self, *lists):
"""Return a unique-ified union of all the lists.
:param lists: indefinite number of lists
"""
results = set()
if len(lists) > 1:
for l in lists:
results = results.union(set(l))
return list(results)
elif len(lists) == 1:
return list(set(lists[0]))
else:
return None