Refactor orchestrator

Refactor orchestrator to break large
monolithic functions into small functions
per action.

- Update orchestrator to match new statemgmt API
- Pull most code out of __init__.py files
- Create action classes for Orchestrator actions
- Create action classes for Driver actions
- Orchestrator consumes tasks from database queue
- Additional encapsulation of task functionality into Task class
- Create shared integration test fixtures
- Fix Sphinx entrypoint so package install works
- Disable bootdata API until BootAction implementation
- Bring codebase into PEP8 compliance
- Update documentation reflect code changes
- Mark SQL #nosec for bandit

Change-Id: Id9a7bdedcdd5bbf07aeabbdb52db0f0b71f1e4a4
This commit is contained in:
Scott Hussey 2017-09-28 21:40:38 -05:00
parent 72adce3d38
commit d12ef71f9f
100 changed files with 6296 additions and 5578 deletions

View File

@ -16,6 +16,8 @@ FROM ubuntu:16.04
ENV DEBIAN_FRONTEND noninteractive
ENV container docker
ENV PORT 9000
ENV LC_ALL C.UTF-8
ENV LANG C.UTF-8
RUN apt -qq update && \
apt -y install git \
@ -31,6 +33,7 @@ RUN apt -qq update && \
libffi-dev \
libssl-dev --no-install-recommends
RUN pip3 install wheel
# Copy direct dependency requirements only to build a dependency layer
COPY ./requirements-lock.txt /tmp/drydock/
RUN pip3 install -r /tmp/drydock/requirements-lock.txt

114
README.md
View File

@ -1,76 +1,76 @@
# drydock_provisioner
A python REST orchestrator to translate a YAML host topology to a provisioned set of hosts and provide a set of cloud-init post-provisioning instructions.
A python REST orchestrator to translate a YAML host topology to a provisioned
set of hosts and provide a set of post-provisioning instructions.
To build and run, first move into the root directory of the repo and run:
See full documentation in [docs/source/index.rst](docs/source/index.rst).
## Required
* Python 3.5+
* A running instance of Postgres v9.5+
* A running instance of Openstack Keystone w/ the v3 API enabled
* A running instance of Canonical MaaS v2.2+
## Recommended
* A running Kubernetes cluster with Helm initialized
* Familiarity with the AT&T Community Undercloud Platform (UCP) suite of services
## Building
This service is intended to be built as a Docker container, not as a
standalone Python package. That being said, instructions are included below
for building as a package and as an image.
### Virtualenv
To build and install Drydock locally in a virtualenv first generate configuration
and policy file templates to be customized
$ tox -e genconfig
$ tox -e genpolicy
$ sudo docker build . -t drydock
$ vi etc/drydock/drydock.conf # Customize configuration
$ sudo docker run -d -v $(pwd)/etc/drydock:/etc/drydock -P --name='drydock' drydock
$ DDPORT=$(sudo docker port drydock 8000/tcp | awk -F ':' '{ print $NF }')
$ curl -v http://localhost:${DDPORT}/api/v1.0/designs
$ virtualenv -p python3.5 /var/tmp/drydock
$ . /var/tmp/drydock/bin/activate
$ pip install -r requirements-lock.txt
$ pip install .
$ cp -r etc/drydock /etc/drydock
See [Configuring Drydock](docs/configuration.rst) for details on customizing the configuration. To be useful, Drydock needs
to operate in a realistic topology and has some required downstream services.
### Docker image
* A VM running Canonical MaaS v2.2+
* A functional Openstack Keystone instance w/ the v3 API
* Docker running to start the Drydock image (can be co-located on the MaaS VM)
* A second VM or Baremetal Node to provision via Drydock
* Baremetal needs to be able to PXE boot
* Preferrably Baremetal will have an IPMI OOB interface
* Either VM or Baremetal will need to have one interface on the same L2 network (LAN or VLAN) as the MaaS VM
$ docker build . -t drydock
See the [Getting Started](docs/getting_started.rst) guide for instructions.
## Running
## Modular service
The preferred deployment pattern of Drydock is via a Helm chart
to deploy Drydock into a Kubernetes cluster. Additionally use of
the rest of the UCP services provides additional functionality
for deploying (Armada) and using (Promenade, Deckhand) Drydock.
### Design Consumer ###
You can see an example of a full UCP deployment in the [UCP Integration](https://github.comatt-comdev/ucp-integration) repository.
aka ingester
### Stand up Kubernetes
Pluggable service to ingest a inventory/design specification, convert it to a standard
internal representaion, and persist it to the Design State API. Initial implementation
is the consumer of YAML schema.
Use the UCP [Promenade](https://github.com/att-comdev/promenade)
tool for starting a self-hosted Kubernetes cluster with Kubernetes
Helm deployed.
### Design State API ###
### Deploy Drydock Dependencies
aka statemgmt
There are Helm charts for deploying all the dependencies of Dryodck.
Use them for preparing your Kuberentes cluster to host Drydock.
API for querying and updating the current design specification and persisted orchestration status.
CRUD support of CIs that are not bootstrap-related, but can be used by other automation.
* [Postgres](https://github.com/openstack/openstack-helm/tree/master/postgresql)
* [Keystone](https://github.com/openstack/openstack-helm/tree/master/keystone)
* [MAAS](https://github.com/att-comdev/maas)
### Control API ###
### Deploy Drydock
aka control
Ideally you will use the UCP [Armada](https://readthedocs.org/projects/armada-helm/)
tool for deploying the Drydock chart with proper overrides, but if not you can
use the `helm` CLI tool. The below are overrides needed during deployment
User-approachable API for initiating orchestration actions or accessing other internal
APIs
### Infrastructure Orchestrator ###
aka orchestrator
Handle validation of complete design, ordering and managing downstream API calls for hardware
provisioning/bootstrapping
### OOB Driver ###
Pluggable provider for server OOB (ILO) management
aka driver/oob
### Node Driver ###
aka driver/node
Pluggable provisioner for server bootstrapping. Initial implementation is MaaS client.
### Introspection API ###
aka introspection
API for bootstrapping nodes to load self data. Possibly pluggable as this is basically an
authenticated bridge to the Design State API
* values.labels.node_selector_key: This is the kubernetes label assigned to the node you expect to host Drydock
* values.conf.dryodck.maasdriver: This is URL Drydock will use to access the MAAS API (including the URL path)
* values.images.drydock: The Drydock docker image to use
* values.images.drydock_db_sync: The Drydock docker image to use

View File

@ -1,6 +1,5 @@
"""Alembic database creation and upgrades."""
import os
import sys
from alembic import context
from sqlalchemy import engine_from_config, pool
@ -64,13 +63,12 @@ def run_migrations_online():
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata
)
connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:

View File

@ -13,15 +13,18 @@ branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
from drydock_provisioner.statemgmt.db import tables
def upgrade():
op.create_table(tables.Tasks.__tablename__, *tables.Tasks.__schema__)
op.create_table(tables.ResultMessage.__tablename__, *tables.ResultMessage.__schema__)
op.create_table(tables.ActiveInstance.__tablename__, *tables.ActiveInstance.__schema__)
op.create_table(tables.BuildData.__tablename__, *tables.BuildData.__schema__)
op.create_table(tables.ResultMessage.__tablename__,
*tables.ResultMessage.__schema__)
op.create_table(tables.ActiveInstance.__tablename__,
*tables.ActiveInstance.__schema__)
op.create_table(tables.BuildData.__tablename__,
*tables.BuildData.__schema__)
def downgrade():

View File

@ -56,7 +56,7 @@ Load Site
To use Drydock for site configuration, you must craft and load a site topology
YAML. An example of this is in ``./examples/designparts_v1.0.yaml``.
Documentation on building your topology document is under construction.
Documentation on building your topology document is at :ref:`topology_label`.
Use the Drydock CLI create a design and load the configuration

View File

@ -32,6 +32,7 @@ Drydock Configuration Guide
:maxdepth: 2
getting_started
configuration
sampleconf
policy-enforcement

View File

@ -14,6 +14,8 @@
License for the specific language governing permissions and limitations
under the License.
.. _topology_label:
=======================
Authoring Site Topology
=======================

View File

@ -20,11 +20,10 @@ import click
from drydock_provisioner.drydock_client.session import DrydockSession
from drydock_provisioner.drydock_client.session import KeystoneClient
from drydock_provisioner.drydock_client.client import DrydockClient
from .design import commands as design
from .part import commands as part
from .task import commands as task
from .node import commands as node
@click.group()
@click.option(
'--debug/--no-debug', help='Enable or disable debugging', default=False)
@ -49,8 +48,8 @@ from .node import commands as node
help='The url of the running drydock instance',
default=lambda: os.environ.get('DD_URL', ''))
@click.pass_context
def drydock(ctx, debug, url, os_project_domain_name, os_user_domain_name, os_project_name,
os_username, os_password, os_auth_url, os_token):
def drydock(ctx, debug, url, os_project_domain_name, os_user_domain_name,
os_project_name, os_username, os_password, os_auth_url, os_token):
"""Drydock CLI to invoke the running instance of the drydock API."""
if not ctx.obj:
ctx.obj = {}
@ -80,10 +79,12 @@ def drydock(ctx, debug, url, os_project_domain_name, os_user_domain_name, os_pro
try:
if not os_token:
logger.debug("Generating Keystone session by env vars: %s" % str(keystone_env))
logger.debug("Generating Keystone session by env vars: %s" %
str(keystone_env))
ks_sess = KeystoneClient.get_ks_session(**keystone_env)
else:
logger.debug("Generating Keystone session by explicit token: %s" % os_token)
logger.debug(
"Generating Keystone session by explicit token: %s" % os_token)
ks_sess = KeystoneClient.get_ks_session(token=os_token)
KeystoneClient.get_token(ks_sess=ks_sess)
except Exception as ex:
@ -93,7 +94,8 @@ def drydock(ctx, debug, url, os_project_domain_name, os_user_domain_name, os_pro
try:
if not url:
url = KeystoneClient.get_endpoint('physicalprovisioner', ks_sess=ks_sess)
url = KeystoneClient.get_endpoint(
'physicalprovisioner', ks_sess=ks_sess)
except Exception as ex:
logger.debug("Exception getting Drydock endpoint.", exc_info=ex)
ctx.fail('Error: Unable to discover Drydock API URL')
@ -116,7 +118,6 @@ def drydock(ctx, debug, url, os_project_domain_name, os_user_domain_name, os_pro
port=url_parse_result.port,
token=token))
drydock.add_command(design.design)
drydock.add_command(part.part)
drydock.add_command(task.task)
drydock.add_command(node.node)

View File

@ -27,8 +27,10 @@ def node():
""" Drydock node commands
"""
@node.command(name='list')
@click.option('--output', '-o', help='Output format: table|json', default='table')
@click.option(
'--output', '-o', help='Output format: table|json', default='table')
@click.pass_context
def node_list(ctx, output='table'):
"""List nodes."""
@ -37,10 +39,17 @@ def node_list(ctx, output='table'):
if output == 'table':
pt = PrettyTable()
pt.field_names = ['Node Name', 'Status', 'CPUs', 'Memory', 'PXE MAC', 'Mgmt IP', 'IPMI IP', 'Power State']
pt.field_names = [
'Node Name', 'Status', 'CPUs', 'Memory', 'PXE MAC', 'Mgmt IP',
'IPMI IP', 'Power State'
]
for n in nodelist:
pt.add_row([n['hostname'], n['status_name'], n['cpu_count'], n['memory'], n['boot_mac'], n['boot_ip'], n['power_address'], n['power_state']])
pt.add_row([
n['hostname'], n['status_name'], n['cpu_count'], n['memory'],
n['boot_mac'], n['boot_ip'], n['power_address'],
n['power_state']
])
click.echo(pt)
elif output == 'json':

View File

View File

View File

@ -11,89 +11,128 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Actions related to task commands
"""
"""Actions related to task commands."""
import time
from drydock_provisioner.cli.action import CliAction
class TaskList(CliAction): # pylint: disable=too-few-public-methods
""" Action to list tasks
"""
"""Action to list tasks."""
def __init__(self, api_client):
"""
:param DrydockClient api_client: The api client used for invocation.
"""Object initializer.
:param DrydockClient api_client: The api client used for invocation.
"""
super().__init__(api_client)
self.logger.debug('TaskList action initialized')
def invoke(self):
"""Invoke execution of this action."""
return self.api_client.get_tasks()
class TaskCreate(CliAction): # pylint: disable=too-few-public-methods
""" Action to create tasks against a design
"""
"""Action to create tasks against a design."""
def __init__(self,
api_client,
design_id,
design_ref,
action_name=None,
node_names=None,
rack_names=None,
node_tags=None):
"""
:param DrydockClient api_client: The api client used for invocation.
:param string design_id: The UUID of the design for which to create a task
:param string action_name: The name of the action being performed for this task
:param List node_names: The list of node names to restrict action application
:param List rack_names: The list of rack names to restrict action application
:param List node_tags: The list of node tags to restrict action application
node_tags=None,
block=False,
poll_interval=15):
"""Object initializer.
:param DrydockClient api_client: The api client used for invocation.
:param string design_ref: The URI reference to design documents
:param string action_name: The name of the action being performed for this task
:param List node_names: The list of node names to restrict action application
:param List rack_names: The list of rack names to restrict action application
:param List node_tags: The list of node tags to restrict action application
:param bool block: Whether to block CLI exit until task completes
:param integer poll_interval: Polling interval to query task status
"""
super().__init__(api_client)
self.design_id = design_id
self.design_ref = design_ref
self.action_name = action_name
self.logger.debug('TaskCreate action initialized for design=%s',
design_id)
design_ref)
self.logger.debug('Action is %s', action_name)
if node_names is None:
node_names = []
if rack_names is None:
rack_names = []
if node_tags is None:
node_tags = []
self.logger.debug("Node names = %s", node_names)
self.logger.debug("Rack names = %s", rack_names)
self.logger.debug("Node tags = %s", node_tags)
self.block = block
self.poll_interval = poll_interval
filter_items = {'filter_type': 'union'}
if node_names is not None:
filter_items['node_names'] = node_names
if rack_names is not None:
filter_items['rack_names'] = rack_names
if node_tags is None:
filter_items['node_tags'] = node_tags
self.node_filter = {
'node_names': node_names,
'rack_names': rack_names,
'node_tags': node_tags
'filter_set_type': 'intersection',
'filter_set': [filter_items]
}
def invoke(self):
return self.api_client.create_task(
design_id=self.design_id,
"""Invoke execution of this action."""
task = self.api_client.create_task(
design_ref=self.design_ref,
task_action=self.action_name,
node_filter=self.node_filter)
if not self.block:
return task
task_id = task.get('task_id')
while True:
time.sleep(self.poll_interval)
task = self.api_client.get_task(task_id=task_id)
if task.status in ['completed', 'terminated']:
return task
class TaskShow(CliAction): # pylint: disable=too-few-public-methods
""" Action to show a task's detial.
"""
"""Action to show a task's detial."""
def __init__(self, api_client, task_id):
"""
:param DrydockClient api_client: The api client used for invocation.
:param string task_id: the UUID of the task to retrieve
def __init__(self, api_client, task_id, block=False, poll_interval=15):
"""Object initializer.
:param DrydockClient api_client: The api client used for invocation.
:param string task_id: the UUID of the task to retrieve
:param bool block: Whether to block CLI exit until task completes
:param integer poll_interval: Polling interval to query task status
"""
super().__init__(api_client)
self.task_id = task_id
self.logger.debug('TaskShow action initialized for task_id=%s,',
task_id)
self.block = block
self.poll_interval = poll_interval
def invoke(self):
return self.api_client.get_task(task_id=self.task_id)
"""Invoke execution of this action."""
task = self.api_client.get_task(task_id=self.task_id)
if not self.block:
return task
task_id = task.get('task_id')
while True:
time.sleep(self.poll_interval)
task = self.api_client.get_task(task_id=task_id)
if task.status in ['completed', 'terminated']:
return task

View File

@ -11,9 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" cli.task.commands
Contains commands related to tasks against designs
"""
"""Contains commands related to tasks against designs."""
import click
import json
@ -24,12 +22,12 @@ from drydock_provisioner.cli.task.actions import TaskCreate
@click.group()
def task():
""" Drydock task commands
"""
"""Drydock task commands."""
@task.command(name='create')
@click.option('--design-id', '-d', help='The design id for this action')
@click.option(
'--design-ref', '-d', help='The design reference for this action')
@click.option('--action', '-a', help='The action to perform')
@click.option(
'--node-names',
@ -43,17 +41,28 @@ def task():
'--node-tags',
'-t',
help='The nodes by tag name targeted by this action, comma separated')
@click.option(
'--block/--no-block',
'-b',
help='The CLI will wait until the created completes before exitting',
default=False)
@click.option(
'--poll-interval',
help='Polling interval to check task status in blocking mode.',
default=15)
@click.pass_context
def task_create(ctx,
design_id=None,
design_ref=None,
action=None,
node_names=None,
rack_names=None,
node_tags=None):
""" Create a task
"""
if not design_id:
ctx.fail('Error: Design id must be specified using --design-id')
node_tags=None,
block=False,
poll_interval=15):
"""Create a task."""
if not design_ref:
ctx.fail(
'Error: Design reference must be specified using --design-ref')
if not action:
ctx.fail('Error: Action must be specified using --action')
@ -62,30 +71,35 @@ def task_create(ctx,
json.dumps(
TaskCreate(
ctx.obj['CLIENT'],
design_id=design_id,
design_ref=design_ref,
action_name=action,
node_names=[x.strip() for x in node_names.split(',')]
if node_names else [],
rack_names=[x.strip() for x in rack_names.split(',')]
if rack_names else [],
node_tags=[x.strip() for x in node_tags.split(',')]
if node_tags else []).invoke()))
if node_tags else [],
block=block,
poll_interval=poll_interval).invoke()))
@task.command(name='list')
@click.pass_context
def task_list(ctx):
""" List tasks.
"""
"""List tasks."""
click.echo(json.dumps(TaskList(ctx.obj['CLIENT']).invoke()))
@task.command(name='show')
@click.option('--task-id', '-t', help='The required task id')
@click.option(
'--block/--no-block',
'-b',
help='The CLI will wait until the created completes before exitting',
default=False)
@click.pass_context
def task_show(ctx, task_id=None):
""" show a task's details
"""
def task_show(ctx, task_id=None, block=False):
"""show a task's details."""
if not task_id:
ctx.fail('The task id must be specified by --task-id')

View File

@ -89,16 +89,16 @@ class DrydockConfig(object):
# Enabled plugins
plugin_options = [
cfg.MultiStrOpt(
cfg.StrOpt(
'ingester',
default=['drydock_provisioner.ingester.plugins.yaml.YamlIngester'],
default='drydock_provisioner.ingester.plugins.yaml.YamlIngester',
help='Module path string of a input ingester to enable'),
cfg.MultiStrOpt(
cfg.ListOpt(
'oob_driver',
default=[
'drydock_provisioner.drivers.oob.pyghmi_driver.PyghmiDriver'
],
help='Module path string of a OOB driver to enable'),
help='List of module path strings of OOB drivers to enable'),
cfg.StrOpt(
'node_driver',
default=
@ -205,8 +205,8 @@ def _list_module_names(pkg_path, parent_module):
continue
elif ispkg:
module_names.extend(
_list_module_names(pkg_path + "/" + module_name, parent_module
+ "." + module_name))
_list_module_names(pkg_path + "/" + module_name,
parent_module + "." + module_name))
else:
module_names.append(parent_module + "." + module_name)
return module_names

View File

@ -13,9 +13,14 @@
# limitations under the License.
import falcon
from .designs import *
from .tasks import *
from .bootdata import *
from .designs import DesignsResource
from .designs import DesignResource
from .designs import DesignsPartsResource
from .designs import DesignsPartsKindsResource
from .designs import DesignsPartResource
from .tasks import TasksResource
from .tasks import TaskResource
from .bootdata import BootdataResource
from .nodes import NodesResource
from .base import DrydockRequest
@ -53,8 +58,8 @@ def start_api(state_manager=None, ingester=None, orchestrator=None):
state_manager=state_manager, orchestrator=orchestrator)),
('/designs/{design_id}/parts', DesignsPartsResource(
state_manager=state_manager, ingester=ingester)),
('/designs/{design_id}/parts/{kind}', DesignsPartsKindsResource(
state_manager=state_manager)),
('/designs/{design_id}/parts/{kind}',
DesignsPartsKindsResource(state_manager=state_manager)),
('/designs/{design_id}/parts/{kind}/{name}', DesignsPartResource(
state_manager=state_manager, orchestrator=orchestrator)),

View File

@ -11,10 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import json
import yaml
import base64
"""Handle resources for bootdata API endpoints.
THIS API IS DEPRECATED
"""
from oslo_config import cfg
@ -56,64 +56,64 @@ class BootdataResource(StatefulResource):
resp.content_type = 'text/plain'
return
elif data_key == 'promconfig':
bootdata = self.state_manager.get_bootdata_key(hostname)
if bootdata is None:
resp.status = falcon.HTTP_404
return
else:
resp.content_type = 'text/plain'
host_design_id = bootdata.get('design_id', None)
host_design = self.orchestrator.get_effective_site(
host_design_id)
host_model = host_design.get_baremetal_node(hostname)
part_selectors = ['all', hostname]
if host_model.tags is not None:
part_selectors.extend(host_model.tags)
all_configs = host_design.get_promenade_config(part_selectors)
part_list = [i.document for i in all_configs]
resp.body = "---\n" + "---\n".join([
base64.b64decode(i.encode()).decode('utf-8')
for i in part_list
]) + "\n..."
return
# The next PS will be a complete rewrite of the bootdata system
# so not wasting time refactoring this
# TODO(sh8121att) rebuild bootdata API for BootAction framework
resp.content = 'text/plain'
return
prom_init_service = \
r"""[Unit]
Description=Promenade Initialization Service
Documentation=http://github.com/att-comdev/drydock
After=network-online.target local-fs.target
ConditionPathExists=!/var/lib/prom.done
# bootdata = self.state_manager.get_bootdata_key(hostname)
#
# if bootdata is None:
# resp.status = falcon.HTTP_404
# return
# else:
# resp.content_type = 'text/plain'
#
# host_design_id = bootdata.get('design_id', None)
# host_design = self.orchestrator.get_effective_site(
# host_design_id)
#
# host_model = host_design.get_baremetal_node(hostname)
#
# part_selectors = ['all', hostname]
#
# if host_model.tags is not None:
# part_selectors.extend(host_model.tags)
#
# all_configs = host_design.get_promenade_config(part_selectors)
#
# part_list = [i.document for i in all_configs]
#
# resp.body = "---\n" + "---\n".join([
# base64.b64decode(i.encode()).decode('utf-8')
# for i in part_list
# ]) + "\n..."
# return
[Service]
Type=simple
ExecStart=/var/tmp/prom_init.sh /etc/prom_init.yaml
prom_init_service = (
"[Unit]\n"
"Description=Promenade Initialization Service\n"
"Documentation=http://github.com/att-comdev/drydock\n"
"After=network-online.target local-fs.target\n"
"ConditionPathExists=!/var/lib/prom.done\n\n"
"[Service]\n"
"Type=simple\n"
"ExecStart=/var/tmp/prom_init.sh /etc/prom_init.yaml\n\n"
"[Install]\n"
"WantedBy=multi-user.target\n")
[Install]
WantedBy=multi-user.target
"""
vfs_service = \
r"""[Unit]
Description=SR-IOV Virtual Function configuration
Documentation=http://github.com/att-comdev/drydock
After=network.target local-fs.target
[Service]
Type=simple
ExecStart=/bin/sh -c '/bin/echo 4 >/sys/class/net/ens3f0/device/sriov_numvfs'
[Install]
WantedBy=multi-user.target
"""
vfs_service = (
"[Unit]\n"
"Description=SR-IOV Virtual Function configuration\n"
"Documentation=http://github.com/att-comdev/drydock\n"
"After=network.target local-fs.target\n\n"
"[Service]\n"
"Type=simple\n"
"ExecStart=/bin/sh -c '/bin/echo 4 >/sys/class/net/ens3f0/device/sriov_numvfs'\n\n"
"[Install]\n"
"WantedBy=multi-user.target\n")
def list_opts():

View File

@ -10,10 +10,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Handler resources for Design API endpoints.
NOTICE - THIS API IS DEPRECATED
"""
import falcon
import json
import uuid
import logging
import drydock_provisioner.policy as policy
import drydock_provisioner.objects as hd_objects
@ -23,12 +27,18 @@ from .base import StatefulResource
class DesignsResource(StatefulResource):
"""Resource handlers for design collection."""
def __init__(self, **kwargs):
super(DesignsResource, self).__init__(**kwargs)
@policy.ApiEnforcer('physical_provisioner:read_data')
def on_get(self, req, resp):
ctx = req.context
"""Method handler for GET requests.
:param req: Falcon request object
:param resp: Falcon response object
"""
state = self.state_manager
try:
@ -46,8 +56,11 @@ class DesignsResource(StatefulResource):
@policy.ApiEnforcer('physical_provisioner:ingest_data')
def on_post(self, req, resp):
ctx = req.context
"""Method handler for POST requests.
:param req: Falcon request object
:param resp: Falcon response object
"""
try:
json_data = self.req_json(req)
design = None
@ -56,8 +69,7 @@ class DesignsResource(StatefulResource):
if base_design is not None:
base_design = uuid.UUID(base_design)
design = hd_objects.SiteDesign(
base_design_id=base_design_uuid)
design = hd_objects.SiteDesign(base_design_id=base_design)
else:
design = hd_objects.SiteDesign()
design.assign_id()
@ -79,6 +91,8 @@ class DesignsResource(StatefulResource):
class DesignResource(StatefulResource):
"""Resource Handlers for design singletons."""
def __init__(self, orchestrator=None, **kwargs):
super(DesignResource, self).__init__(**kwargs)
self.authorized_roles = ['user']
@ -86,8 +100,13 @@ class DesignResource(StatefulResource):
@policy.ApiEnforcer('physical_provisioner:read_data')
def on_get(self, req, resp, design_id):
"""Method Handler for GET design singleton.
:param req: Falcon request object
:param resp: Falcon response object
:param design_id: UUID of the design resource
"""
source = req.params.get('source', 'designed')
ctx = req.context
try:
design = None
@ -168,7 +187,7 @@ class DesignsPartsResource(StatefulResource):
def on_get(self, req, resp, design_id):
try:
design = self.state_manager.get_design(design_id)
except DesignError:
except errors.DesignError:
self.return_error(
resp,
falcon.HTTP_404,
@ -218,7 +237,6 @@ class DesignsPartsKindsResource(StatefulResource):
@policy.ApiEnforcer('physical_provisioner:read_data')
def on_get(self, req, resp, design_id, kind):
ctx = req.context
resp.status = falcon.HTTP_200
@ -231,7 +249,6 @@ class DesignsPartResource(StatefulResource):
@policy.ApiEnforcer('physical_provisioner:read_data')
def on_get(self, req, resp, design_id, kind, name):
ctx = req.context
source = req.params.get('source', 'designed')
try:

View File

@ -11,9 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Middleware classes for Falcon-based API."""
import logging
import uuid
import re
from oslo_config import cfg

View File

@ -30,8 +30,9 @@ class NodesResource(BaseResource):
@policy.ApiEnforcer('physical_provisioner:read_data')
def on_get(self, req, resp):
try:
maas_client = MaasRequestFactory(config.config_mgr.conf.maasdriver.maas_api_url,
config.config_mgr.conf.maasdriver.maas_api_key)
maas_client = MaasRequestFactory(
config.config_mgr.conf.maasdriver.maas_api_url,
config.config_mgr.conf.maasdriver.maas_api_key)
machine_list = Machines(maas_client)
machine_list.refresh()
@ -39,7 +40,16 @@ class NodesResource(BaseResource):
node_view = list()
for m in machine_list:
m.get_power_params()
node_view.append(dict(hostname=m.hostname, memory=m.memory, cpu_count=m.cpu_count, status_name=m.status_name, boot_mac=m.boot_mac, power_state=m.power_state, power_address=m.power_parameters.get('power_address'), boot_ip=m.boot_ip))
node_view.append(
dict(
hostname=m.hostname,
memory=m.memory,
cpu_count=m.cpu_count,
status_name=m.status_name,
boot_mac=m.boot_mac,
power_state=m.power_state,
power_address=m.power_parameters.get('power_address'),
boot_ip=m.boot_ip))
resp.body = json.dumps(node_view)
resp.status = falcon.HTTP_200

View File

@ -11,28 +11,38 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Handler resources for task management API."""
import falcon
import json
import threading
import traceback
import uuid
from drydock_provisioner import policy
from drydock_provisioner import error as errors
from drydock_provisioner.objects import fields as hd_fields
import drydock_provisioner.objects.task as obj_task
from .base import StatefulResource
class TasksResource(StatefulResource):
"""Handler resource for /tasks collection endpoint."""
def __init__(self, orchestrator=None, **kwargs):
super(TasksResource, self).__init__(**kwargs)
"""Object initializer.
:param orchestrator: instance of orchestrator.Orchestrator
"""
super().__init__(**kwargs)
self.orchestrator = orchestrator
@policy.ApiEnforcer('physical_provisioner:read_task')
def on_get(self, req, resp):
"""Handler for GET method."""
try:
task_id_list = [str(x.get_id()) for x in self.state_manager.tasks]
resp.body = json.dumps(task_id_list)
task_model_list = self.state_manager.get_tasks()
task_list = [str(x.to_dict()) for x in task_model_list]
resp.body = json.dumps(task_list)
resp.status = falcon.HTTP_200
except Exception as ex:
self.error(req.context, "Unknown error: %s\n%s" %
@ -42,19 +52,19 @@ class TasksResource(StatefulResource):
@policy.ApiEnforcer('physical_provisioner:create_task')
def on_post(self, req, resp):
"""Handler for POST method."""
# A map of supported actions to the handlers for tasks for those actions
supported_actions = {
'validate_design': TasksResource.task_validate_design,
'verify_site': TasksResource.task_verify_site,
'prepare_site': TasksResource.task_prepare_site,
'verify_node': TasksResource.task_verify_node,
'prepare_node': TasksResource.task_prepare_node,
'deploy_node': TasksResource.task_deploy_node,
'destroy_node': TasksResource.task_destroy_node,
'verify_nodes': TasksResource.task_verify_nodes,
'prepare_nodes': TasksResource.task_prepare_nodes,
'deploy_nodes': TasksResource.task_deploy_nodes,
'destroy_nodes': TasksResource.task_destroy_nodes,
}
try:
ctx = req.context
json_data = self.req_json(req)
action = json_data.get('action', None)
@ -75,6 +85,7 @@ class TasksResource(StatefulResource):
@policy.ApiEnforcer('physical_provisioner:validate_design')
def task_validate_design(self, req, resp, json_data):
"""Create async task for validate design."""
action = json_data.get('action', None)
if action != 'validate_design':
@ -86,7 +97,7 @@ class TasksResource(StatefulResource):
resp, falcon.HTTP_500, message="Error", retry=False)
try:
task = self.create_task(json_data)
task = self.create_task(json_data, req.context)
resp.body = json.dumps(task.to_dict())
resp.append_header('Location',
"/api/v1.0/tasks/%s" % str(task.task_id))
@ -98,6 +109,7 @@ class TasksResource(StatefulResource):
@policy.ApiEnforcer('physical_provisioner:verify_site')
def task_verify_site(self, req, resp, json_data):
"""Create async task for verify site."""
action = json_data.get('action', None)
if action != 'verify_site':
@ -109,7 +121,7 @@ class TasksResource(StatefulResource):
resp, falcon.HTTP_500, message="Error", retry=False)
try:
task = self.create_task(json_data)
task = self.create_task(json_data, req.context)
resp.body = json.dumps(task.to_dict())
resp.append_header('Location',
"/api/v1.0/tasks/%s" % str(task.task_id))
@ -121,6 +133,7 @@ class TasksResource(StatefulResource):
@policy.ApiEnforcer('physical_provisioner:prepare_site')
def task_prepare_site(self, req, resp, json_data):
"""Create async task for prepare site."""
action = json_data.get('action', None)
if action != 'prepare_site':
@ -132,7 +145,7 @@ class TasksResource(StatefulResource):
resp, falcon.HTTP_500, message="Error", retry=False)
try:
task = self.create_task(json_data)
task = self.create_task(json_data, req.context)
resp.body = json.dumps(task.to_dict())
resp.append_header('Location',
"/api/v1.0/tasks/%s" % str(task.task_id))
@ -142,20 +155,21 @@ class TasksResource(StatefulResource):
self.return_error(
resp, falcon.HTTP_400, message=ex.msg, retry=False)
@policy.ApiEnforcer('physical_provisioner:verify_node')
def task_verify_node(self, req, resp, json_data):
@policy.ApiEnforcer('physical_provisioner:verify_nodes')
def task_verify_nodes(self, req, resp, json_data):
"""Create async task for verify node."""
action = json_data.get('action', None)
if action != 'verify_node':
if action != 'verify_nodes':
self.error(
req.context,
"Task body ended up in wrong handler: action %s in task_verify_node"
"Task body ended up in wrong handler: action %s in task_verify_nodes"
% action)
self.return_error(
resp, falcon.HTTP_500, message="Error", retry=False)
try:
task = self.create_task(json_data)
task = self.create_task(json_data, req.context)
resp.body = json.dumps(task.to_dict())
resp.append_header('Location',
"/api/v1.0/tasks/%s" % str(task.task_id))
@ -165,20 +179,21 @@ class TasksResource(StatefulResource):
self.return_error(
resp, falcon.HTTP_400, message=ex.msg, retry=False)
@policy.ApiEnforcer('physical_provisioner:prepare_node')
def task_prepare_node(self, req, resp, json_data):
@policy.ApiEnforcer('physical_provisioner:prepare_nodes')
def task_prepare_nodes(self, req, resp, json_data):
"""Create async task for prepare node."""
action = json_data.get('action', None)
if action != 'prepare_node':
if action != 'prepare_nodes':
self.error(
req.context,
"Task body ended up in wrong handler: action %s in task_prepare_node"
"Task body ended up in wrong handler: action %s in task_prepare_nodes"
% action)
self.return_error(
resp, falcon.HTTP_500, message="Error", retry=False)
try:
task = self.create_task(json_data)
task = self.create_task(json_data, req.context)
resp.body = json.dumps(task.to_dict())
resp.append_header('Location',
"/api/v1.0/tasks/%s" % str(task.task_id))
@ -188,20 +203,21 @@ class TasksResource(StatefulResource):
self.return_error(
resp, falcon.HTTP_400, message=ex.msg, retry=False)
@policy.ApiEnforcer('physical_provisioner:deploy_node')
def task_deploy_node(self, req, resp, json_data):
@policy.ApiEnforcer('physical_provisioner:deploy_nodes')
def task_deploy_nodes(self, req, resp, json_data):
"""Create async task for deploy node."""
action = json_data.get('action', None)
if action != 'deploy_node':
if action != 'deploy_nodes':
self.error(
req.context,
"Task body ended up in wrong handler: action %s in task_deploy_node"
"Task body ended up in wrong handler: action %s in task_deploy_nodes"
% action)
self.return_error(
resp, falcon.HTTP_500, message="Error", retry=False)
try:
task = self.create_task(json_data)
task = self.create_task(json_data, req.context)
resp.body = json.dumps(task.to_dict())
resp.append_header('Location',
"/api/v1.0/tasks/%s" % str(task.task_id))
@ -211,20 +227,21 @@ class TasksResource(StatefulResource):
self.return_error(
resp, falcon.HTTP_400, message=ex.msg, retry=False)
@policy.ApiEnforcer('physical_provisioner:destroy_node')
def task_destroy_node(self, req, resp, json_data):
@policy.ApiEnforcer('physical_provisioner:destroy_nodes')
def task_destroy_nodes(self, req, resp, json_data):
"""Create async task for destroy node."""
action = json_data.get('action', None)
if action != 'destroy_node':
if action != 'destroy_nodes':
self.error(
req.context,
"Task body ended up in wrong handler: action %s in task_destroy_node"
"Task body ended up in wrong handler: action %s in task_destroy_nodes"
% action)
self.return_error(
resp, falcon.HTTP_500, message="Error", retry=False)
try:
task = self.create_task(json_data)
task = self.create_task(json_data, req.context)
resp.body = json.dumps(task.to_dict())
resp.append_header('Location',
"/api/v1.0/tasks/%s" % str(task.task_id))
@ -234,56 +251,53 @@ class TasksResource(StatefulResource):
self.return_error(
resp, falcon.HTTP_400, message=ex.msg, retry=False)
def create_task(self, task_body):
"""
Given the parsed body of a create task request, create the task
and start it in a thread
def create_task(self, task_body, req_context):
"""General task creation.
Given the parsed ``task_body`` of a create task request, create the task
and queue it in the database.
:param dict task_body: Dict representing the JSON body of a create task request
action - The action the task will execute
design_id - The design context the task will execute in
node_filter - A filter on which nodes will be affected by the task. The result is
an intersection of
applying all filters
node_names - A list of node hostnames
rack_names - A list of rack names that contain the nodes
node_tags - A list of tags applied to the nodes
design_ref - A URI reference to the design document set the task should operate on
node_filter - A filter on which nodes will be affected by the task.
:return: The Task object created
"""
design_id = task_body.get('design_id', None)
design_ref = task_body.get('design_ref', None)
node_filter = task_body.get('node_filter', None)
action = task_body.get('action', None)
if design_id is None or action is None:
if design_ref is None or action is None:
raise errors.InvalidFormat(
'Task creation requires fields design_id, action')
'Task creation requires fields design_ref, action')
task = self.orchestrator.create_task(
obj_task.OrchestratorTask,
design_id=design_id,
design_ref=design_ref,
action=action,
node_filter=node_filter)
task_thread = threading.Thread(
target=self.orchestrator.execute_task, args=[task.get_id()])
task_thread.start()
node_filter=node_filter,
context=req_context)
task.set_status(hd_fields.TaskStatus.Queued)
task.save()
return task
class TaskResource(StatefulResource):
"""Handler resource for /tasks/<id> singleton endpoint."""
def __init__(self, orchestrator=None, **kwargs):
super(TaskResource, self).__init__(**kwargs)
self.authorized_roles = ['user']
"""Object initializer.
:param orchestrator: instance of orchestrator.Orchestrator
"""
super().__init__(**kwargs)
self.orchestrator = orchestrator
@policy.ApiEnforcer('physical_provisioner:read_task')
def on_get(self, req, resp, task_id):
ctx = req.context
"""Handler for GET method."""
try:
task = self.state_manager.get_task(task_id)
task = self.state_manager.get_task(uuid.UUID(task_id))
if task is None:
self.info(req.context, "Task %s does not exist" % task_id)

View File

@ -11,93 +11,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from threading import Thread, Lock
import uuid
import time
import drydock_provisioner.objects.fields as hd_fields
import drydock_provisioner.statemgmt as statemgmt
import drydock_provisioner.objects.task as tasks
import drydock_provisioner.error as errors
# This is the interface for the orchestrator to access a driver
# TODO Need to have each driver spin up a seperate thread to manage
# driver tasks and feed them via queue
class ProviderDriver(object):
driver_name = "generic"
driver_key = "generic"
driver_desc = "Generic Provider Driver"
def __init__(self, orchestrator=None, state_manager=None, **kwargs):
if orchestrator is None:
raise ValueError("ProviderDriver requires valid orchestrator")
self.orchestrator = orchestrator
if state_manager is None:
raise ValueError("ProviderDriver requires valid state manager")
self.state_manager = state_manager
# These are the actions that this driver supports
self.supported_actions = [hd_fields.OrchestratorAction.Noop]
def execute_task(self, task_id):
task = self.state_manager.get_task(task_id)
task_action = task.action
if task_action in self.supported_actions:
task_runner = DriverTaskRunner(task_id, self.state_manager,
self.orchestrator)
task_runner.start()
while task_runner.is_alive():
time.sleep(1)
return
else:
raise errors.DriverError("Unsupported action %s for driver %s" %
(task_action, self.driver_desc))
# Execute a single task in a separate thread
class DriverTaskRunner(Thread):
def __init__(self, task_id, state_manager=None, orchestrator=None):
super(DriverTaskRunner, self).__init__()
self.orchestrator = orchestrator
if isinstance(state_manager, statemgmt.DesignState):
self.state_manager = state_manager
else:
raise DriverError("Invalid state manager specified")
self.task = self.state_manager.get_task(task_id)
return
def run(self):
self.execute_task()
def execute_task(self):
if self.task.action == hd_fields.OrchestratorAction.Noop:
self.orchestrator.task_field_update(
self.task.get_id(), status=hd_fields.TaskStatus.Running)
i = 0
while i < 5:
self.task = self.state_manager.get_task(self.task.get_id())
i = i + 1
if self.task.terminate:
self.orchestrator.task_field_update(
self.task.get_id(),
status=hd_fields.TaskStatus.Terminated)
return
else:
time.sleep(1)
self.orchestrator.task_field_update(
self.task.get_id(), status=hd_fields.TaskStatus.Complete)
return
"""Drivers for use by the orchestrator workflow."""

View File

@ -0,0 +1,86 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Abstract classes for use by implemented drivers."""
from threading import Thread
import time
import drydock_provisioner.objects.fields as hd_fields
import drydock_provisioner.statemgmt as statemgmt
import drydock_provisioner.error as errors
class ProviderDriver(object):
"""Generic driver for executing driver actions."""
driver_name = "generic"
driver_key = "generic"
driver_desc = "Generic Provider Driver"
def __init__(self, orchestrator=None, state_manager=None, **kwargs):
if orchestrator is None:
raise ValueError("ProviderDriver requires valid orchestrator")
self.orchestrator = orchestrator
if state_manager is None:
raise ValueError("ProviderDriver requires valid state manager")
self.state_manager = state_manager
# These are the actions that this driver supports
self.supported_actions = [hd_fields.OrchestratorAction.Noop]
def execute_task(self, task_id):
task = self.state_manager.get_task(task_id)
task_action = task.action
if task_action in self.supported_actions:
task_runner = DriverActionRunner(task_id, self.state_manager,
self.orchestrator)
task_runner.start()
while task_runner.is_alive():
time.sleep(1)
return
else:
raise errors.DriverError("Unsupported action %s for driver %s" %
(task_action, self.driver_desc))
# Execute a single task in a separate thread
class DriverActionRunner(Thread):
def __init__(self, action=None, state_manager=None, orchestrator=None):
super().__init__()
self.orchestrator = orchestrator
if isinstance(state_manager, statemgmt.DesignState):
self.state_manager = state_manager
else:
raise errors.DriverError("Invalid state manager specified")
self.action = action
def run(self):
self.run_action()
def run_action(self):
"""Run self.action.start() in this thread."""
if self.action is None:
raise errors.NoActionDefined(
"Runner started without a defined action.")
self.action.start()

View File

@ -11,46 +11,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import drydock_provisioner.objects.fields as hd_fields
import drydock_provisioner.error as errors
from drydock_provisioner.drivers import ProviderDriver
class NodeDriver(ProviderDriver):
driver_name = "node_generic"
driver_key = "node_generic"
driver_desc = "Generic Node Driver"
def __init__(self, **kwargs):
super(NodeDriver, self).__init__(**kwargs)
self.supported_actions = [
hd_fields.OrchestratorAction.ValidateNodeServices,
hd_fields.OrchestratorAction.CreateNetworkTemplate,
hd_fields.OrchestratorAction.CreateStorageTemplate,
hd_fields.OrchestratorAction.CreateBootMedia,
hd_fields.OrchestratorAction.PrepareHardwareConfig,
hd_fields.OrchestratorAction.IdentifyNode,
hd_fields.OrchestratorAction.ConfigureHardware,
hd_fields.OrchestratorAction.InterrogateNode,
hd_fields.OrchestratorAction.ApplyNodeNetworking,
hd_fields.OrchestratorAction.ApplyNodeStorage,
hd_fields.OrchestratorAction.ApplyNodePlatform,
hd_fields.OrchestratorAction.DeployNode,
hd_fields.OrchestratorAction.DestroyNode,
hd_fields.OrchestratorAction.ConfigureUserCredentials
]
def execute_task(self, task_id):
task = self.state_manager.get_task(task_id)
task_action = task.action
if task_action in self.supported_actions:
return
else:
raise errors.DriverError("Unsupported action %s for driver %s" %
(task_action, self.driver_desc))
"""Generic driver for node provisioning."""

View File

@ -0,0 +1,56 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Generic driver for node provisioning."""
import drydock_provisioner.objects.fields as hd_fields
import drydock_provisioner.error as errors
from drydock_provisioner.drivers.driver import ProviderDriver
class NodeDriver(ProviderDriver):
driver_name = "node_generic"
driver_key = "node_generic"
driver_desc = "Generic Node Driver"
def __init__(self, **kwargs):
super(NodeDriver, self).__init__(**kwargs)
self.supported_actions = [
hd_fields.OrchestratorAction.ValidateNodeServices,
hd_fields.OrchestratorAction.CreateNetworkTemplate,
hd_fields.OrchestratorAction.CreateStorageTemplate,
hd_fields.OrchestratorAction.CreateBootMedia,
hd_fields.OrchestratorAction.PrepareHardwareConfig,
hd_fields.OrchestratorAction.IdentifyNode,
hd_fields.OrchestratorAction.ConfigureHardware,
hd_fields.OrchestratorAction.InterrogateNode,
hd_fields.OrchestratorAction.ApplyNodeNetworking,
hd_fields.OrchestratorAction.ApplyNodeStorage,
hd_fields.OrchestratorAction.ApplyNodePlatform,
hd_fields.OrchestratorAction.DeployNode,
hd_fields.OrchestratorAction.DestroyNode,
hd_fields.OrchestratorAction.ConfigureUserCredentials
]
def execute_task(self, task_id):
task = self.state_manager.get_task(task_id)
task_action = task.action
if task_action in self.supported_actions:
return
else:
raise errors.DriverError("Unsupported action %s for driver %s" %
(task_action, self.driver_desc))

View File

@ -11,3 +11,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Driver for povisioning nodes via Canonical MaaS."""

File diff suppressed because it is too large Load Diff

View File

@ -11,6 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Client for submitting authenticated requests to MaaS API."""
import logging
from oauthlib import oauth1
@ -58,7 +60,7 @@ class MaasRequestFactory(object):
self.signer = MaasOauth(apikey)
self.http_session = requests.Session()
# TODO Get logger name from config
# TODO(sh8121att) Get logger name from config
self.logger = logging.getLogger('drydock')
def get(self, endpoint, **kwargs):
@ -129,20 +131,19 @@ class MaasRequestFactory(object):
continue
elif isinstance(v, list):
for i in v:
files_tuples.append(
(k, (None, base64.b64encode(
str(i).encode('utf-8')).decode('utf-8'),
'text/plain; charset="utf-8"', {
'Content-Transfer-Encoding': 'base64'
})))
value = base64.b64encode(
str(i).encode('utf-8')).decode('utf-8')
content_type = 'text/plain; charset="utf-8"'
part_headers = {'Content-Transfer-Encoding': 'base64'}
files_tuples.append((k, (None, value, content_type,
part_headers)))
else:
files_tuples.append((k, (None, base64.b64encode(
str(v).encode('utf-8')).decode('utf-8'),
'text/plain; charset="utf-8"', {
'Content-Transfer-Encoding':
'base64'
})))
value = base64.b64encode(
str(v).encode('utf-8')).decode('utf-8')
content_type = 'text/plain; charset="utf-8"'
part_headers = {'Content-Transfer-Encoding': 'base64'}
files_tuples.append((k, (None, value, content_type,
part_headers)))
kwargs['files'] = files_tuples
params = kwargs.get('params', None)
@ -154,7 +155,7 @@ class MaasRequestFactory(object):
elif 'op' in kwargs.keys():
kwargs.pop('op')
# TODO timeouts should be configurable
# TODO(sh8121att) timeouts should be configurable
timeout = kwargs.pop('timeout', None)
if timeout is None:
timeout = (2, 30)
@ -173,10 +174,8 @@ class MaasRequestFactory(object):
if resp.status_code >= 400:
self.logger.debug(
"FAILED API CALL:\nURL: %s %s\nBODY:\n%s\nRESPONSE: %s\nBODY:\n%s"
% (prepared_req.method, prepared_req.url,
str(prepared_req.body).replace('\\r\\n', '\n'),
resp.status_code, resp.text))
"Received error response - URL: %s %s - RESPONSE: %s" %
(prepared_req.method, prepared_req.url, resp.status_code))
raise errors.DriverError("MAAS Error: %s - %s" % (resp.status_code,
resp.text))
return resp

File diff suppressed because it is too large Load Diff

View File

@ -10,4 +10,5 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# limitations under the License.
"""Models for representing MaaS API resources."""

View File

@ -56,7 +56,7 @@ class ResourceBase(object):
def delete(self):
"""Delete this resource in MaaS."""
url = self.interpolate_url()
resp = self.api_client.delete(url)
self.api_client.delete(url)
"""
Parse URL for placeholders and replace them with current

View File

@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
"""Models for MaaS Fabric resources."""
import drydock_provisioner.drivers.node.maasdriver.models.base as model_base
import drydock_provisioner.drivers.node.maasdriver.models.vlan as model_vlan

View File

@ -132,7 +132,9 @@ class Interface(model_base.ResourceBase):
resp = self.api_client.post(
url,
op='unlink_subnet',
files={'id': l.get('resource_id')})
files={
'id': l.get('resource_id')
})
if not resp.ok:
raise errors.DriverError("Error unlinking subnet")

View File

@ -285,7 +285,9 @@ class Machine(model_base.ResourceBase):
url = self.interpolate_url()
resp = self.api_client.post(
url, op='set_owner_data', files={key: value})
url, op='set_owner_data', files={
key: value
})
if resp.status_code != 200:
self.logger.error(
@ -335,7 +337,8 @@ class Machine(model_base.ResourceBase):
if isinstance(obj_dict['boot_interface'], dict):
refined_dict['boot_mac'] = obj_dict['boot_interface'][
'mac_address']
refined_dict['boot_ip'] = obj_dict['boot_interface']['links'][0]['ip_address']
refined_dict['boot_ip'] = obj_dict['boot_interface']['links'][
0]['ip_address']
i = cls(api_client, **refined_dict)
return i
@ -378,7 +381,9 @@ class Machines(model_base.ResourceCollectionBase):
url = self.interpolate_url()
resp = self.api_client.post(
url, op='allocate', files={'system_id': node.resource_id})
url, op='allocate', files={
'system_id': node.resource_id
})
if not resp.ok:
self.logger.error(

View File

@ -13,10 +13,7 @@
# limitations under the License.
"""Model for MaaS rack-controller API resource."""
import bson
import drydock_provisioner.drivers.node.maasdriver.models.base as model_base
import drydock_provisioner.drivers.node.maasdriver.models.interface as maas_interface
import drydock_provisioner.drivers.node.maasdriver.models.machine as maas_machine

View File

@ -11,8 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Model for MaaS SSH Key resources."""
import drydock_provisioner.error as errors
import drydock_provisioner.drivers.node.maasdriver.models.base as model_base
@ -25,7 +25,7 @@ class SshKey(model_base.ResourceBase):
def __init__(self, api_client, **kwargs):
super(SshKey, self).__init__(api_client, **kwargs)
#Keys should never have newlines, but sometimes they get added
# Keys should never have newlines, but sometimes they get added
self.key = self.key.replace("\n", "")

View File

@ -0,0 +1,52 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Model representing MaaS static routes resource."""
import drydock_provisioner.drivers.node.maasdriver.models.base as model_base
class StaticRoute(model_base.ResourceBase):
"""Model for single static route."""
resource_url = 'static-routes/{resource_id}/'
fields = ['resource_id', 'source', 'destination', 'gateway_ip', 'metric']
json_fields = ['source', 'destination', 'gateway_ip', 'metric']
def __init__(self, api_client, **kwargs):
super().__init__(api_client, **kwargs)
@classmethod
def from_dict(cls, api_client, obj_dict):
refined_dict = {k: obj_dict.get(k, None) for k in cls.fields}
if 'id' in obj_dict.keys():
refined_dict['resource_id'] = obj_dict.get('id')
if isinstance(refined_dict.get('source', None), dict):
refined_dict['source'] = refined_dict['source']['id']
if isinstance(refined_dict.get('destination', None), dict):
refined_dict['destination'] = refined_dict['destination']['id']
i = cls(api_client, **refined_dict)
return i
class StaticRoutes(model_base.ResourceCollectionBase):
"""Model for a collection of static routes."""
collection_url = 'static-routes/'
collection_resource = StaticRoute
def __init__(self, api_client, **kwargs):
super().__init__(api_client, **kwargs)

View File

@ -13,9 +13,11 @@
# limitations under the License.
import drydock_provisioner.drivers.node.maasdriver.models.base as model_base
import drydock_provisioner.drivers.node.maasdriver.models.iprange as maas_iprange
import drydock_provisioner.drivers.node.maasdriver.models.staticroute as maas_route
class Subnet(model_base.ResourceBase):
"""Model for a subnet."""
resource_url = 'subnets/{resource_id}/'
fields = [
@ -72,6 +74,30 @@ class Subnet(model_base.ResourceBase):
maas_ranges = maas_iprange.IpRanges(self.api_client)
maas_ranges.add(maas_range)
def add_static_route(self, dest_subnet, gateway, metric=100):
"""Add a static route to ``dest_subnet`` via ``gateway`` to this source subnet.
:param dest_subnet: maas resource_id of the destination subnet
:param gateway: string IP address of the nexthop gateway
:param metric: weight to assign this gateway
"""
sr = maas_route.StaticRoutes(self.api_client)
current_route = sr.singleton({
'source': self.resource_id,
'destination': dest_subnet
})
if current_route is not None:
current_route.delete()
new_route = maas_route.StaticRoute(
self.api_client,
source=self.resource_id,
destination=dest_subnet,
gateway_ip=gateway,
metric=metric)
new_route = sr.add(new_route)
return new_route
@classmethod
def from_dict(cls, api_client, obj_dict):
"""
@ -92,6 +118,7 @@ class Subnet(model_base.ResourceBase):
class Subnets(model_base.ResourceCollectionBase):
"""Model for collection of subnets."""
collection_url = 'subnets/'
collection_resource = Subnet

View File

@ -11,12 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Models for MaaS Tag resources."""
import drydock_provisioner.error as errors
import drydock_provisioner.drivers.node.maasdriver.models.base as model_base
import yaml
class Tag(model_base.ResourceBase):
@ -71,7 +70,9 @@ class Tag(model_base.ResourceBase):
url = self.interpolate_url()
resp = self.api_client.post(
url, op='update_nodes', files={'add': system_id})
url, op='update_nodes', files={
'add': system_id
})
if not resp.ok:
self.logger.error(

View File

@ -11,9 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
"""Models representing MaaS VLAN resources."""
import drydock_provisioner.error as errors
import drydock_provisioner.drivers.node.maasdriver.models.base as model_base

View File

@ -103,7 +103,9 @@ class VolumeGroup(model_base.ResourceBase):
url = self.interpolate_url()
resp = self.api_client.post(
url, op='delete_logical_volume', files={'id': target_lv})
url, op='delete_logical_volume', files={
'id': target_lv
})
if not resp.ok:
raise Exception("MAAS error - %s - %s" % (resp.status_code,

View File

@ -11,53 +11,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import drydock_provisioner.objects.fields as hd_fields
import drydock_provisioner.error as errors
from drydock_provisioner.drivers import ProviderDriver
class OobDriver(ProviderDriver):
oob_types_supported = ['']
def __init__(self, **kwargs):
super(OobDriver, self).__init__(**kwargs)
self.supported_actions = [
hd_fields.OrchestratorAction.ValidateOobServices,
hd_fields.OrchestratorAction.ConfigNodePxe,
hd_fields.OrchestratorAction.SetNodeBoot,
hd_fields.OrchestratorAction.PowerOffNode,
hd_fields.OrchestratorAction.PowerOnNode,
hd_fields.OrchestratorAction.PowerCycleNode,
hd_fields.OrchestratorAction.InterrogateOob
]
self.driver_name = "oob_generic"
self.driver_key = "oob_generic"
self.driver_desc = "Generic OOB Driver"
def execute_task(self, task_id):
task = self.state_manager.get_task(task_id)
task_action = task.action
if task_action in self.supported_actions:
return
else:
raise DriverError("Unsupported action %s for driver %s" %
(task_action, self.driver_desc))
@classmethod
def oob_type_support(cls, type_string):
"""
Does this driver support a particular OOB type
:param type_string: OOB type to check
"""
if type_string in cls.oob_types_supported:
return True
return False
"""Generic OOB mgmt driver."""

View File

@ -0,0 +1,52 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Generic OOB driver."""
import drydock_provisioner.objects.fields as hd_fields
from drydock_provisioner.drivers.driver import ProviderDriver
class OobDriver(ProviderDriver):
"""Genneric driver for OOB actions."""
oob_types_supported = ['']
def __init__(self, **kwargs):
super(OobDriver, self).__init__(**kwargs)
self.supported_actions = [
hd_fields.OrchestratorAction.ValidateOobServices,
hd_fields.OrchestratorAction.ConfigNodePxe,
hd_fields.OrchestratorAction.SetNodeBoot,
hd_fields.OrchestratorAction.PowerOffNode,
hd_fields.OrchestratorAction.PowerOnNode,
hd_fields.OrchestratorAction.PowerCycleNode,
hd_fields.OrchestratorAction.InterrogateOob
]
self.driver_name = "oob_generic"
self.driver_key = "oob_generic"
self.driver_desc = "Generic OOB Driver"
@classmethod
def oob_type_support(cls, type_string):
"""Check if this driver support a particular OOB type.
:param type_string: OOB type to check
"""
if type_string in cls.oob_types_supported:
return True
return False

View File

@ -10,4 +10,5 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# limitations under the License.
"""Driver for manually executing OOB actions."""

View File

@ -11,6 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Noop driver for manually executing OOB tasks."""
import time
import logging
@ -19,10 +21,8 @@ from oslo_config import cfg
import drydock_provisioner.error as errors
import drydock_provisioner.objects.fields as hd_fields
import drydock_provisioner.objects.task as task_model
import drydock_provisioner.drivers.oob as oob
import drydock_provisioner.drivers as drivers
import drydock_provisioner.drivers.oob.driver as oob
class ManualDriver(oob.OobDriver):
@ -52,9 +52,9 @@ class ManualDriver(oob.OobDriver):
"Driver %s doesn't support task action %s" % (self.driver_desc,
task.action))
design_id = getattr(task, 'design_id', None)
design_ref = task.design_ref
if design_id is None:
if design_ref is None:
raise errors.DriverError("No design ID specified in task %s" %
(task_id))
@ -66,7 +66,8 @@ class ManualDriver(oob.OobDriver):
time.sleep(60)
self.orchestrator.task_field_update(
task.get_id(),
status=hd_fields.TaskStatus.Complete,
result=hd_fields.ActionResult.Success)
task.set_status(hd_fields.TaskStatus.Complete)
task.success()
task.save()
return

View File

@ -11,435 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import logging
from oslo_config import cfg
from pyghmi.ipmi.command import Command
from pyghmi.exceptions import IpmiException
import drydock_provisioner.error as errors
import drydock_provisioner.objects.fields as hd_fields
import drydock_provisioner.objects.task as task_model
import drydock_provisioner.drivers.oob as oob
import drydock_provisioner.drivers as drivers
class PyghmiDriver(oob.OobDriver):
pyghmi_driver_options = [
cfg.IntOpt(
'poll_interval',
default=10,
help='Polling interval in seconds for querying IPMI status'),
]
oob_types_supported = ['ipmi']
driver_name = "pyghmi_driver"
driver_key = "pyghmi_driver"
driver_desc = "Pyghmi OOB Driver"
oob_types_supported = ['ipmi']
def __init__(self, **kwargs):
super(PyghmiDriver, self).__init__(**kwargs)
cfg.CONF.register_opts(
PyghmiDriver.pyghmi_driver_options, group=PyghmiDriver.driver_key)
self.logger = logging.getLogger(cfg.CONF.logging.oobdriver_logger_name)
def execute_task(self, task_id):
task = self.state_manager.get_task(task_id)
if task is None:
self.logger.error("Invalid task %s" % (task_id))
raise errors.DriverError("Invalid task %s" % (task_id))
if task.action not in self.supported_actions:
self.logger.error("Driver %s doesn't support task action %s" %
(self.driver_desc, task.action))
raise errors.DriverError(
"Driver %s doesn't support task action %s" % (self.driver_desc,
task.action))
design_id = getattr(task, 'design_id', None)
if design_id is None:
raise errors.DriverError("No design ID specified in task %s" %
(task_id))
self.orchestrator.task_field_update(
task.get_id(), status=hd_fields.TaskStatus.Running)
if task.action == hd_fields.OrchestratorAction.ValidateOobServices:
self.orchestrator.task_field_update(
task.get_id(),
status=hd_fields.TaskStatus.Complete,
result=hd_fields.ActionResult.Success)
return
site_design = self.orchestrator.get_effective_site(design_id)
target_nodes = []
if len(task.node_list) > 0:
target_nodes.extend([
x for x in site_design.baremetal_nodes
if x.get_name() in task.node_list
])
else:
target_nodes.extend(site_design.baremetal_nodes)
incomplete_subtasks = []
# For each target node, create a subtask and kick off a runner
for n in target_nodes:
subtask = self.orchestrator.create_task(
task_model.DriverTask,
parent_task_id=task.get_id(),
design_id=design_id,
action=task.action,
task_scope={'node_names': [n.get_name()]})
incomplete_subtasks.append(subtask.get_id())
runner = PyghmiTaskRunner(
state_manager=self.state_manager,
orchestrator=self.orchestrator,
task_id=subtask.get_id(),
node=n)
runner.start()
attempts = 0
max_attempts = getattr(cfg.CONF.timeouts, task.action,
cfg.CONF.timeouts.drydock_timeout) * (
60 / cfg.CONF.pyghmi_driver.poll_interval)
while (len(incomplete_subtasks) > 0 and attempts <= max_attempts):
for n in incomplete_subtasks:
t = self.state_manager.get_task(n)
if t.get_status() in [
hd_fields.TaskStatus.Terminated,
hd_fields.TaskStatus.Complete,
hd_fields.TaskStatus.Errored
]:
incomplete_subtasks.remove(n)
time.sleep(cfg.CONF.pyghmi_driver.poll_interval)
attempts = attempts + 1
task = self.state_manager.get_task(task.get_id())
subtasks = map(self.state_manager.get_task, task.get_subtasks())
success_subtasks = [
x for x in subtasks
if x.get_result() == hd_fields.ActionResult.Success
]
nosuccess_subtasks = [
x for x in subtasks
if x.get_result() in [
hd_fields.ActionResult.PartialSuccess,
hd_fields.ActionResult.Failure
]
]
task_result = None
if len(success_subtasks) > 0 and len(nosuccess_subtasks) > 0:
task_result = hd_fields.ActionResult.PartialSuccess
elif len(success_subtasks) == 0 and len(nosuccess_subtasks) > 0:
task_result = hd_fields.ActionResult.Failure
elif len(success_subtasks) > 0 and len(nosuccess_subtasks) == 0:
task_result = hd_fields.ActionResult.Success
else:
task_result = hd_fields.ActionResult.Incomplete
self.orchestrator.task_field_update(
task.get_id(),
result=task_result,
status=hd_fields.TaskStatus.Complete)
return
class PyghmiTaskRunner(drivers.DriverTaskRunner):
def __init__(self, node=None, **kwargs):
super(PyghmiTaskRunner, self).__init__(**kwargs)
self.logger = logging.getLogger('drydock.oobdriver.pyghmi')
# We cheat here by providing the Node model instead
# of making the runner source it from statemgmt
if node is None:
self.logger.error("Did not specify target node")
raise errors.DriverError("Did not specify target node")
self.node = node
def execute_task(self):
task_action = self.task.action
if len(self.task.node_list) != 1:
self.orchestrator.task_field_update(
self.task.get_id(),
result=hd_fields.ActionResult.Incomplete,
status=hd_fields.TaskStatus.Errored)
raise errors.DriverError(
"Multiple names (%s) in task %s node_list" %
(len(self.task.node_list), self.task.get_id()))
target_node_name = self.task.node_list[0]
if self.node.get_name() != target_node_name:
self.orchestrator.task_field_update(
self.task.get_id(),
result=hd_fields.ActionResult.Incomplete,
status=hd_fields.TaskStatus.Errored)
raise errors.DriverError("Runner node does not match " \
"task node scope")
self.orchestrator.task_field_update(
self.task.get_id(), status=hd_fields.TaskStatus.Running)
if task_action == hd_fields.OrchestratorAction.ConfigNodePxe:
self.orchestrator.task_field_update(
self.task.get_id(),
result=hd_fields.ActionResult.Failure,
status=hd_fields.TaskStatus.Complete)
return
elif task_action == hd_fields.OrchestratorAction.SetNodeBoot:
worked = False
self.logger.debug("Setting bootdev to PXE for %s" % self.node.name)
self.exec_ipmi_command(Command.set_bootdev, 'pxe')
time.sleep(3)
bootdev = self.exec_ipmi_command(Command.get_bootdev)
if bootdev.get('bootdev', '') == 'network':
self.logger.debug(
"%s reports bootdev of network" % self.node.name)
self.orchestrator.task_field_update(
self.task.get_id(),
result=hd_fields.ActionResult.Success,
status=hd_fields.TaskStatus.Complete)
return
else:
self.logger.warning("%s reports bootdev of %s" %
(self.node.name,
bootdev.get('bootdev', None)))
worked = False
self.logger.error(
"Giving up on IPMI command to %s after 3 attempts" %
self.node.name)
self.orchestrator.task_field_update(
self.task.get_id(),
result=hd_fields.ActionResult.Failure,
status=hd_fields.TaskStatus.Complete)
return
elif task_action == hd_fields.OrchestratorAction.PowerOffNode:
worked = False
self.logger.debug(
"Sending set_power = off command to %s" % self.node.name)
self.exec_ipmi_command(Command.set_power, 'off')
i = 18
while i > 0:
self.logger.debug("Polling powerstate waiting for success.")
power_state = self.exec_ipmi_command(Command.get_power)
if power_state.get('powerstate', '') == 'off':
self.logger.debug("Node reports powerstate of off")
worked = True
break
time.sleep(10)
i = i - 1
if worked:
self.orchestrator.task_field_update(
self.task.get_id(),
result=hd_fields.ActionResult.Success,
status=hd_fields.TaskStatus.Complete)
else:
self.logger.error(
"Giving up on IPMI command to %s" % self.node.name)
self.orchestrator.task_field_update(
self.task.get_id(),
result=hd_fields.ActionResult.Failure,
status=hd_fields.TaskStatus.Complete)
return
elif task_action == hd_fields.OrchestratorAction.PowerOnNode:
worked = False
self.logger.debug(
"Sending set_power = off command to %s" % self.node.name)
self.exec_ipmi_command(Command.set_power, 'off')
i = 18
while i > 0:
self.logger.debug("Polling powerstate waiting for success.")
power_state = self.exec_ipmi_command(Command.get_power)
if power_state.get('powerstate', '') == 'off':
self.logger.debug("Node reports powerstate of off")
worked = True
break
time.sleep(10)
i = i - 1
if worked:
self.orchestrator.task_field_update(
self.task.get_id(),
result=hd_fields.ActionResult.Success,
status=hd_fields.TaskStatus.Complete)
else:
self.logger.error(
"Giving up on IPMI command to %s" % self.node.name)
self.orchestrator.task_field_update(
self.task.get_id(),
result=hd_fields.ActionResult.Failure,
status=hd_fields.TaskStatus.Complete)
return
elif task_action == hd_fields.OrchestratorAction.PowerCycleNode:
self.logger.debug(
"Sending set_power = off command to %s" % self.node.name)
self.exec_ipmi_command(Command.set_power, 'off')
# Wait for power state of off before booting back up
# We'll wait for up to 3 minutes to power off
i = 18
while i > 0:
power_state = self.exec_ipmi_command(Command.get_power)
if power_state is not None and power_state.get(
'powerstate', '') == 'off':
self.logger.debug(
"%s reports powerstate of off" % self.node.name)
break
elif power_state is None:
self.logger.debug("None response on IPMI power query to %s"
% self.node.name)
time.sleep(10)
i = i - 1
if power_state.get('powerstate', '') == 'on':
self.logger.warning(
"Failed powering down node %s during power cycle task" %
self.node.name)
self.orchestrator.task_field_update(
self.task.get_id(),
result=hd_fields.ActionResult.Failure,
status=hd_fields.TaskStatus.Complete)
return
self.logger.debug(
"Sending set_power = on command to %s" % self.node.name)
self.exec_ipmi_command(Command.set_power, 'on')
i = 18
while i > 0:
power_state = self.exec_ipmi_command(Command.get_power)
if power_state is not None and power_state.get(
'powerstate', '') == 'on':
self.logger.debug(
"%s reports powerstate of on" % self.node.name)
break
elif power_state is None:
self.logger.debug("None response on IPMI power query to %s"
% self.node.name)
time.sleep(10)
i = i - 1
if power_state.get('powerstate', '') == 'on':
self.orchestrator.task_field_update(
self.task.get_id(),
result=hd_fields.ActionResult.Success,
status=hd_fields.TaskStatus.Complete)
else:
self.logger.warning(
"Failed powering up node %s during power cycle task" %
self.node.name)
self.orchestrator.task_field_update(
self.task.get_id(),
result=hd_fields.ActionResult.Failure,
status=hd_fields.TaskStatus.Complete)
return
elif task_action == hd_fields.OrchestratorAction.InterrogateOob:
mci_id = self.exec_ipmi_command(Command.get_mci)
self.orchestrator.task_field_update(
self.task.get_id(),
result=hd_fields.ActionResult.Success,
status=hd_fields.TaskStatus.Complete,
result_detail=mci_id)
return
def get_ipmi_session(self):
"""
Initialize a Pyghmi IPMI session to this runner's self.node
:return: An instance of pyghmi.ipmi.command.Command initialized to nodes' IPMI interface
"""
node = self.node
if node.oob_type != 'ipmi':
raise errors.DriverError("Node OOB type is not IPMI")
ipmi_network = self.node.oob_parameters['network']
ipmi_address = self.node.get_network_address(ipmi_network)
if ipmi_address is None:
raise errors.DriverError("Node %s has no IPMI address" %
(node.name))
ipmi_account = self.node.oob_parameters['account']
ipmi_credential = self.node.oob_parameters['credential']
self.logger.debug("Starting IPMI session to %s with %s/%s" %
(ipmi_address, ipmi_account, ipmi_credential[:1]))
ipmi_session = Command(
bmc=ipmi_address, userid=ipmi_account, password=ipmi_credential)
return ipmi_session
def exec_ipmi_command(self, callable, *args):
"""
Call an IPMI command after establishing a session with this runner's node
:param callable: The pyghmi Command method to call
:param args: The args to pass the callable
"""
attempts = 0
while attempts < 5:
try:
self.logger.debug("Initializing IPMI session")
ipmi_session = self.get_ipmi_session()
except IpmiException as iex:
self.logger.error("Error initializing IPMI session for node %s"
% self.node.name)
self.logger.debug("IPMI Exception: %s" % str(iex))
self.logger.warning(
"IPMI command failed, retrying after 15 seconds...")
time.sleep(15)
attempts = attempts + 1
continue
try:
self.logger.debug("Calling IPMI command %s on %s" %
(callable.__name__, self.node.name))
response = callable(ipmi_session, *args)
ipmi_session.ipmi_session.logout()
return response
except IpmiException as iex:
self.logger.error("Error sending command: %s" % str(iex))
self.logger.warning(
"IPMI command failed, retrying after 15 seconds...")
time.sleep(15)
attempts = attempts + 1
def list_opts():
return {PyghmiDriver.driver_key: PyghmiDriver.pyghmi_driver_options}

View File

@ -0,0 +1,423 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Driver for controlling OOB interface via IPMI.
Based on Openstack Ironic Pyghmi driver.
"""
import time
from pyghmi.ipmi.command import Command
from pyghmi.exceptions import IpmiException
from drydock_provisioner.orchestrator.actions.orchestrator import BaseAction
import drydock_provisioner.error as errors
import drydock_provisioner.objects.fields as hd_fields
class PyghmiBaseAction(BaseAction):
"""Base action for Pyghmi executed actions."""
def get_ipmi_session(self, node):
"""Initialize a Pyghmi IPMI session to the node.
:param node: instance of objects.BaremetalNode
:return: An instance of pyghmi.ipmi.command.Command initialized to nodes' IPMI interface
"""
if node.oob_type != 'ipmi':
raise errors.DriverError("Node OOB type is not IPMI")
ipmi_network = node.oob_parameters['network']
ipmi_address = node.get_network_address(ipmi_network)
if ipmi_address is None:
raise errors.DriverError("Node %s has no IPMI address" %
(node.name))
ipmi_account = node.oob_parameters['account']
ipmi_credential = node.oob_parameters['credential']
self.logger.debug("Starting IPMI session to %s with %s/%s" %
(ipmi_address, ipmi_account, ipmi_credential[:1]))
ipmi_session = Command(
bmc=ipmi_address, userid=ipmi_account, password=ipmi_credential)
return ipmi_session
def exec_ipmi_command(self, node, func, *args):
"""Call an IPMI command after establishing a session.
:param node: Instance of objects.BaremetalNode to execute against
:param func: The pyghmi Command method to call
:param args: The args to pass the func
"""
attempts = 0
while attempts < 5:
try:
self.logger.debug("Initializing IPMI session")
ipmi_session = self.get_ipmi_session(node)
except (IpmiException, errors.DriverError) as iex:
self.logger.error("Error initializing IPMI session for node %s"
% self.node.name)
self.logger.debug("IPMI Exception: %s" % str(iex))
self.logger.warning(
"IPMI command failed, retrying after 15 seconds...")
time.sleep(15)
attempts = attempts + 1
continue
try:
self.logger.debug("Calling IPMI command %s on %s" %
(func.__name__, node.name))
response = func(ipmi_session, *args)
ipmi_session.ipmi_session.logout()
return response
except IpmiException as iex:
self.logger.error("Error sending command: %s" % str(iex))
self.logger.warning(
"IPMI command failed, retrying after 15 seconds...")
time.sleep(15)
attempts = attempts + 1
raise errors.DriverError("IPMI command failed.")
class ValidateOobServices(PyghmiBaseAction):
"""Action to validation OOB services are available."""
def start(self):
self.task.add_status_msg(
msg="OOB does not require services.",
error=False,
ctx='NA',
ctx_type='NA')
self.task.set_status(hd_fields.TaskStatus.Complete)
self.task.success()
self.task.save()
return
class ConfigNodePxe(PyghmiBaseAction):
"""Action to configure PXE booting via OOB."""
def start(self):
self.task.set_status(hd_fields.TaskStatus.Running)
self.task.save()
design_status, site_design = self.orchestrator.get_effective_site(
self.task.design_ref)
node_list = self.orchestrator.process_node_filter(
self.task.node_filter, site_design)
for n in node_list:
self.task.add_status_msg(
msg="Pyghmi doesn't configure PXE options.",
error=True,
ctx=n.name,
ctx_type='node')
self.task.set_status(hd_fields.TaskStatus.Complete)
self.task.failure()
self.task.save()
return
class SetNodeBoot(PyghmiBaseAction):
"""Action to configure a node to PXE boot."""
def start(self):
self.task.set_status(hd_fields.TaskStatus.Running)
self.task.save()
design_status, site_design = self.orchestrator.get_effective_site(
self.task.design_ref)
node_list = self.orchestrator.process_node_filter(
self.task.node_filter, site_design)
for n in node_list:
self.logger.debug("Setting bootdev to PXE for %s" % n.name)
self.task.add_status_msg(
msg="Setting node to PXE boot.",
error=False,
ctx=n.name,
ctx_type='node')
self.exec_ipmi_command(n, Command.set_bootdev, 'pxe')
time.sleep(3)
bootdev = self.exec_ipmi_command(n, Command.get_bootdev)
if bootdev is not None and (bootdev.get('bootdev',
'') == 'network'):
self.task.add_status_msg(
msg="Set bootdev to PXE.",
error=False,
ctx=n.name,
ctx_type='node')
self.logger.debug("%s reports bootdev of network" % n.name)
self.task.success()
else:
self.task.add_status_msg(
msg="Unable to set bootdev to PXE.",
error=True,
ctx=n.name,
ctx_type='node')
self.task.failure()
self.logger.warning("Unable to set node %s to PXE boot." %
(n.name))
self.task.set_status(hd_fields.TaskStatus.Complete)
self.task.save()
return
class PowerOffNode(PyghmiBaseAction):
"""Action to power off a node via IPMI."""
def start(self):
self.task.set_status(hd_fields.TaskStatus.Running)
self.task.save()
design_status, site_design = self.orchestrator.get_effective_site(
self.task.design_ref)
node_list = self.orchestrator.process_node_filter(
self.task.node_filter, site_design)
for n in node_list:
self.logger.debug("Sending set_power = off command to %s" % n.name)
self.task.add_status_msg(
msg="Sending set_power = off command.",
error=False,
ctx=n.name,
ctx_type='node')
self.exec_ipmi_command(n, Command.set_power, 'off')
i = 18
while i > 0:
self.logger.debug("Polling powerstate waiting for success.")
power_state = self.exec_ipmi_command(n, Command.get_power)
if power_state is not None and (power_state.get(
'powerstate', '') == 'off'):
self.task.add_status_msg(
msg="Node reports power off.",
error=False,
ctx=n.name,
ctx_type='node')
self.logger.debug(
"Node %s reports powerstate of off" % n.name)
self.task.success()
break
time.sleep(10)
i = i - 1
if power_state is not None and (power_state.get('powerstate', '')
!= 'off'):
self.task.add_status_msg(
msg="Node failed to power off.",
error=True,
ctx=n.name,
ctx_type='node')
self.logger.error("Giving up on IPMI command to %s" % n.name)
self.task.failure()
self.task.set_status(hd_fields.TaskStatus.Complete)
self.task.save()
return
class PowerOnNode(PyghmiBaseAction):
"""Action to power on a node via IPMI."""
def start(self):
self.task.set_status(hd_fields.TaskStatus.Running)
self.task.save()
design_status, site_design = self.orchestrator.get_effective_site(
self.task.design_ref)
node_list = self.orchestrator.process_node_filter(
self.task.node_filter, site_design)
for n in node_list:
self.logger.debug("Sending set_power = off command to %s" % n.name)
self.task.add_status_msg(
msg="Sending set_power = on command.",
error=False,
ctx=n.name,
ctx_type='node')
self.exec_ipmi_command(n, Command.set_power, 'off')
i = 18
while i > 0:
self.logger.debug("Polling powerstate waiting for success.")
power_state = self.exec_ipmi_command(n, Command.get_power)
if power_state is not None and (power_state.get(
'powerstate', '') == 'on'):
self.logger.debug(
"Node %s reports powerstate of on" % n.name)
self.task.add_status_msg(
msg="Node reports power on.",
error=False,
ctx=n.name,
ctx_type='node')
self.task.success()
break
time.sleep(10)
i = i - 1
if power_state is not None and (power_state.get('powerstate', '')
!= 'on'):
self.task.add_status_msg(
msg="Node failed to power on.",
error=True,
ctx=n.name,
ctx_type='node')
self.logger.error("Giving up on IPMI command to %s" % n.name)
self.task.failure()
self.task.set_status(hd_fields.TaskStatus.Complete)
self.task.save()
return
class PowerCycleNode(PyghmiBaseAction):
"""Action to hard powercycle a node via IPMI."""
def start(self):
self.task.set_status(hd_fields.TaskStatus.Running)
self.task.save()
design_status, site_design = self.orchestrator.get_effective_site(
self.task.design_ref)
node_list = self.orchestrator.process_node_filter(
self.task.node_filter, site_design)
for n in node_list:
self.logger.debug("Sending set_power = off command to %s" % n.name)
self.task.add_status_msg(
msg="Power cycling node via IPMI.",
error=False,
ctx=n.name,
ctx_type='node')
self.exec_ipmi_command(n, Command.set_power, 'off')
# Wait for power state of off before booting back up
# We'll wait for up to 3 minutes to power off
i = 18
while i > 0:
power_state = self.exec_ipmi_command(n, Command.get_power)
if power_state is not None and power_state.get(
'powerstate', '') == 'off':
self.logger.debug("%s reports powerstate of off" % n.name)
break
elif power_state is None:
self.logger.debug(
"No response on IPMI power query to %s" % n.name)
time.sleep(10)
i = i - 1
if power_state.get('powerstate', '') == 'on':
self.task.add_status_msg(
msg="Failed to power down during power cycle.",
error=True,
ctx=n.name,
ctx_type='node')
self.logger.warning(
"Failed powering down node %s during power cycle task" %
n.name)
self.task.failure()
break
self.logger.debug("Sending set_power = on command to %s" % n.name)
self.exec_ipmi_command(n, Command.set_power, 'on')
i = 18
while i > 0:
power_state = self.exec_ipmi_command(n, Command.get_power)
if power_state is not None and power_state.get(
'powerstate', '') == 'on':
self.logger.debug("%s reports powerstate of on" % n.name)
break
elif power_state is None:
self.logger.debug(
"No response on IPMI power query to %s" % n.name)
time.sleep(10)
i = i - 1
if power_state is not None and (power_state.get('powerstate',
'') == 'on'):
self.task.add_status_msg(
msg="Node power cycle complete.",
error=False,
ctx=n.name,
ctx_type='node')
self.task.success()
else:
self.task.add_status_msg(
msg="Failed to power up during power cycle.",
error=True,
ctx=n.name,
ctx_type='node')
self.logger.warning(
"Failed powering up node %s during power cycle task" %
n.name)
self.task.failure()
self.task.set_status(hd_fields.TaskStatus.Complete)
self.task.save()
return
class InterrogateOob(PyghmiBaseAction):
"""Action to complete a basic interrogation of the node IPMI interface."""
def start(self):
self.task.set_status(hd_fields.TaskStatus.Running)
self.task.save()
design_status, site_design = self.orchestrator.get_effective_site(
self.task.design_ref)
node_list = self.orchestrator.process_node_filter(
self.task.node_filter, site_design)
for n in node_list:
try:
self.logger.debug(
"Interrogating node %s IPMI interface." % n.name)
mci_id = self.exec_ipmi_command(n, Command.get_mci)
self.task.add_status_msg(
msg="IPMI interface interrogation yielded MCI ID %s" %
mci_id,
error=False,
ctx=n.name,
ctx_type='node')
self.task.success()
except errors.DriverError:
self.logger.debug(
"Interrogating node %s IPMI interface failed." % n.name)
self.task.add_status_msg(
msg="IPMI interface interrogation failed.",
error=True,
ctx=n.name,
ctx_type='node')
self.task.failure()
self.task.set_status(hd_fields.TaskStatus.Complete)
self.task.save()
return

View File

@ -0,0 +1,159 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Driver for controlling OOB interface via IPMI.
Based on Openstack Ironic Pyghmi driver.
"""
import uuid
import logging
import concurrent.futures
from oslo_config import cfg
import drydock_provisioner.error as errors
import drydock_provisioner.config as config
import drydock_provisioner.objects.fields as hd_fields
import drydock_provisioner.drivers.oob.driver as oob_driver
import drydock_provisioner.drivers.driver as generic_driver
from .actions.oob import ValidateOobServices
from .actions.oob import ConfigNodePxe
from .actions.oob import SetNodeBoot
from .actions.oob import PowerOffNode
from .actions.oob import PowerOnNode
from .actions.oob import PowerCycleNode
from .actions.oob import InterrogateOob
class PyghmiDriver(oob_driver.OobDriver):
"""Driver for executing OOB actions via Pyghmi IPMI library."""
pyghmi_driver_options = [
cfg.IntOpt(
'poll_interval',
default=10,
help='Polling interval in seconds for querying IPMI status'),
]
oob_types_supported = ['ipmi']
driver_name = "pyghmi_driver"
driver_key = "pyghmi_driver"
driver_desc = "Pyghmi OOB Driver"
oob_types_supported = ['ipmi']
action_class_map = {
hd_fields.OrchestratorAction.ValidateOobServices: ValidateOobServices,
hd_fields.OrchestratorAction.ConfigNodePxe: ConfigNodePxe,
hd_fields.OrchestratorAction.SetNodeBoot: SetNodeBoot,
hd_fields.OrchestratorAction.PowerOffNode: PowerOffNode,
hd_fields.OrchestratorAction.PowerOnNode: PowerOnNode,
hd_fields.OrchestratorAction.PowerCycleNode: PowerCycleNode,
hd_fields.OrchestratorAction.InterrogateOob: InterrogateOob,
}
def __init__(self, **kwargs):
super().__init__(**kwargs)
cfg.CONF.register_opts(
PyghmiDriver.pyghmi_driver_options, group=PyghmiDriver.driver_key)
self.logger = logging.getLogger(
config.config_mgr.conf.logging.oobdriver_logger_name)
def execute_task(self, task_id):
task = self.state_manager.get_task(task_id)
if task is None:
self.logger.error("Invalid task %s" % (task_id))
raise errors.DriverError("Invalid task %s" % (task_id))
if task.action not in self.supported_actions:
self.logger.error("Driver %s doesn't support task action %s" %
(self.driver_desc, task.action))
raise errors.DriverError(
"Driver %s doesn't support task action %s" % (self.driver_desc,
task.action))
task.set_status(hd_fields.TaskStatus.Running)
task.save()
target_nodes = self.orchestrator.get_target_nodes(task)
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as e:
subtask_futures = dict()
for n in target_nodes:
sub_nf = self.orchestrator.create_nodefilter_from_nodelist([n])
subtask = self.orchestrator.create_task(
action=task.action,
design_ref=task.design_ref,
node_filter=sub_nf)
task.register_subtask(subtask)
self.logger.debug(
"Starting Pyghmi subtask %s for action %s on node %s" %
(str(subtask.get_id()), task.action, n.name))
action_class = self.action_class_map.get(task.action, None)
if action_class is None:
self.logger.error(
"Could not find action resource for action %s" %
task.action)
self.task.failure()
break
action = action_class(subtask, self.orchestrator,
self.state_manager)
subtask_futures[subtask.get_id().bytes] = e.submit(
action.start)
timeout = config.config_mgr.conf.timeouts.drydock_timeout
finished, running = concurrent.futures.wait(
subtask_futures.values(), timeout=(timeout * 60))
for t, f in subtask_futures.items():
if not f.done():
task.add_status_msg(
"Subtask %s timed out before completing.",
error=True,
ctx=str(uuid.UUID(bytes=t)),
ctx_type='task')
task.failure()
else:
if f.exception():
self.logger.error(
"Uncaught exception in subtask %s" % str(
uuid.UUID(bytes=t)),
exc_info=f.exception())
task.align_result()
task.set_status(hd_fields.TaskStatus.Complete)
task.save()
return
class PyghmiActionRunner(generic_driver.DriverActionRunner):
"""Threaded runner for a Pyghmi Action."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.logger = logging.getLogger(
config.config_mgr.conf.logging.oobdriver_logger_name)
def list_opts():
return {PyghmiDriver.driver_key: PyghmiDriver.pyghmi_driver_options}

View File

@ -14,15 +14,17 @@
import logging
import sys
import os
import threading
from oslo_config import cfg
from drydock_provisioner import policy
from drydock_provisioner.statemgmt.state import DrydockState
from drydock_provisioner.ingester.ingester import Ingester
from drydock_provisioner.orchestrator.orchestrator import Orchestrator
import drydock_provisioner.config as config
import drydock_provisioner.objects as objects
import drydock_provisioner.ingester as ingester
import drydock_provisioner.statemgmt as statemgmt
import drydock_provisioner.orchestrator as orch
import drydock_provisioner.control.api as api
@ -35,18 +37,19 @@ def start_drydock():
'debug', short='d', default=False, help='Enable debug logging'),
]
cfg.CONF.register_cli_opts(cli_options)
config.config_mgr.conf.register_cli_opts(cli_options)
config.config_mgr.register_options()
cfg.CONF(sys.argv[1:])
config.config_mgr.conf(sys.argv[1:])
if cfg.CONF.debug:
cfg.CONF.set_override(
if config.config_mgr.conf.debug:
config.config_mgr.conf.set_override(
name='log_level', override='DEBUG', group='logging')
# Setup root logger
logger = logging.getLogger(cfg.CONF.logging.global_logger_name)
logger = logging.getLogger(
config.config_mgr.conf.logging.global_logger_name)
logger.setLevel(cfg.CONF.logging.log_level)
logger.setLevel(config.config_mgr.conf.logging.log_level)
ch = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(filename)s:%(funcName)s - %(message)s'
@ -55,7 +58,8 @@ def start_drydock():
logger.addHandler(ch)
# Specalized format for API logging
logger = logging.getLogger(cfg.CONF.logging.control_logger_name)
logger = logging.getLogger(
config.config_mgr.conf.logging.control_logger_name)
logger.propagate = False
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(user)s - %(req_id)s - %(external_ctx)s - %(message)s'
@ -65,16 +69,24 @@ def start_drydock():
ch.setFormatter(formatter)
logger.addHandler(ch)
state = statemgmt.DesignState()
state = DrydockState()
state.connect_db()
orchestrator = orch.Orchestrator(cfg.CONF.plugins, state_manager=state)
input_ingester = ingester.Ingester()
input_ingester.enable_plugins(cfg.CONF.plugins.ingester)
input_ingester = Ingester()
input_ingester.enable_plugin(config.config_mgr.conf.plugins.ingester)
orchestrator = Orchestrator(
enabled_drivers=config.config_mgr.conf.plugins,
state_manager=state,
ingester=input_ingester)
orch_thread = threading.Thread(target=orchestrator.watch_for_tasks)
orch_thread.start()
# Check if we have an API key in the environment
# Hack around until we move MaaS configs to the YAML schema
if 'MAAS_API_KEY' in os.environ:
cfg.CONF.set_override(
config.config_mgr.conf.set_override(
name='maas_api_key',
override=os.environ['MAAS_API_KEY'],
group='maasdriver')
@ -90,8 +102,9 @@ def start_drydock():
orchestrator=orchestrator)
# Now that loggers are configured, log the effective config
cfg.CONF.log_opt_values(
logging.getLogger(cfg.CONF.logging.global_logger_name), logging.DEBUG)
config.config_mgr.conf.log_opt_values(
logging.getLogger(config.config_mgr.conf.logging.global_logger_name),
logging.DEBUG)
return wsgi_callable

View File

@ -11,8 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import requests
"""REST client for Drydock API."""
import logging
from drydock_provisioner import error as errors
@ -39,93 +39,6 @@ class DrydockClient(object):
return resp.json()
def get_design_ids(self):
"""
Get list of Drydock design_ids
:return: A list of string design_ids
"""
endpoint = 'v1.0/designs'
resp = self.session.get(endpoint)
self._check_response(resp)
return resp.json()
def get_design(self, design_id, source='designed'):
"""
Get a full design based on the passed design_id
:param string design_id: A UUID design_id
:param string source: The model source to return. 'designed' is as input, 'compiled' is after merging
:return: A dict of the design and all currently loaded design parts
"""
endpoint = "v1.0/designs/%s" % design_id
resp = self.session.get(endpoint, query={'source': source})
self._check_response(resp)
return resp.json()
def create_design(self, base_design=None):
"""
Create a new design context for holding design parts
:param string base_design: String UUID of the base design to build this design upon
:return string: String UUID of the design ID
"""
endpoint = 'v1.0/designs'
if base_design is not None:
resp = self.session.post(
endpoint, data={'base_design_id': base_design})
else:
resp = self.session.post(endpoint)
self._check_response(resp)
design = resp.json()
return design.get('id', None)
def get_part(self, design_id, kind, key, source='designed'):
"""
Query the model definition of a design part
:param string design_id: The string UUID of the design context to query
:param string kind: The design part kind as defined in the Drydock design YAML schema
:param string key: The design part key, generally a name.
:param string source: The model source to return. 'designed' is as input, 'compiled' is after merging
:return: A dict of the design part
"""
endpoint = "v1.0/designs/%s/parts/%s/%s" % (design_id, kind, key)
resp = self.session.get(endpoint, query={'source': source})
self._check_response(resp)
return resp.json()
def load_parts(self, design_id, yaml_string=None):
"""
Load new design parts into a design context via YAML conforming to the Drydock design YAML schema
:param string design_id: String uuid design_id of the design context
:param string yaml_string: A single or multidoc YAML string to be ingested
:return: Dict of the parsed design parts
"""
endpoint = "v1.0/designs/%s/parts" % (design_id)
resp = self.session.post(
endpoint, query={'ingester': 'yaml'}, body=yaml_string)
self._check_response(resp)
return resp.json()
def get_tasks(self):
"""
Get a list of all the tasks, completed or running.
@ -157,22 +70,22 @@ class DrydockClient(object):
return resp.json()
def create_task(self, design_id, task_action, node_filter=None):
def create_task(self, design_ref, task_action, node_filter=None):
"""
Create a new task in Drydock
:param string design_id: A string uuid identifying the design context the task should operate on
:param string design_ref: A URI reference to the design documents for this task
:param string task_action: The action that should be executed
:param dict node_filter: A filter for narrowing the scope of the task. Valid fields are 'node_names',
'rack_names', 'node_tags'.
:return: The string uuid of the create task's id
:return: The dictionary representation of the created task
"""
endpoint = 'v1.0/tasks'
task_dict = {
'action': task_action,
'design_id': design_id,
'design_ref': design_ref,
'node_filter': node_filter,
}
@ -183,7 +96,7 @@ class DrydockClient(object):
self._check_response(resp)
return resp.json().get('task_id')
return resp.json()
def _check_response(self, resp):
if resp.status_code == 401:

View File

@ -17,6 +17,7 @@ import logging
from keystoneauth1 import session
from keystoneauth1.identity import v3
class DrydockSession(object):
"""
A session to the Drydock API maintaining credentials and API options
@ -89,7 +90,6 @@ class DrydockSession(object):
class KeystoneClient(object):
@staticmethod
def get_endpoint(endpoint, ks_sess=None, auth_info=None):
"""
@ -103,7 +103,8 @@ class KeystoneClient(object):
if ks_sess is None:
ks_sess = KeystoneClient.get_ks_session(**auth_info)
return ks_sess.get_endpoint(interface='internal', service_type=endpoint)
return ks_sess.get_endpoint(
interface='internal', service_type=endpoint)
@staticmethod
def get_token(ks_sess=None, auth_info=None):

View File

@ -18,6 +18,14 @@ class DesignError(Exception):
pass
class IngesterError(DesignError):
pass
class InvalidDesignReference(DesignError):
pass
class StateError(Exception):
pass
@ -30,6 +38,14 @@ class OrchestratorError(Exception):
pass
class MaxRetriesReached(OrchestratorError):
pass
class CollectSubaskTimeout(OrchestratorError):
pass
class TransientOrchestratorError(OrchestratorError):
pass

View File

@ -11,132 +11,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ingester - Ingest host topologies to define site design and
# persist design to helm-drydock's statemgmt service
import logging
import yaml
import uuid
import importlib
import drydock_provisioner.objects as objects
import drydock_provisioner.objects.site as site
import drydock_provisioner.objects.network as network
import drydock_provisioner.objects.hwprofile as hwprofile
import drydock_provisioner.objects.node as node
import drydock_provisioner.objects.hostprofile as hostprofile
import drydock_provisioner.objects.promenade as prom
import drydock_provisioner.objects.rack as rack
from drydock_provisioner.statemgmt import DesignState
class Ingester(object):
def __init__(self):
self.logger = logging.getLogger("drydock.ingester")
self.registered_plugins = {}
def enable_plugins(self, plugins=[]):
"""
enable_plugins
:params plugins: - A list of strings naming class objects denoting the ingester plugins to be enabled
Enable plugins that can be used for ingest_data calls. Each plugin should use
drydock_provisioner.ingester.plugins.IngesterPlugin as its base class. As long as one
enabled plugin successfully initializes, the call is considered successful. Otherwise
it will throw an exception
"""
if len(plugins) == 0:
self.log.error("Cannot have an empty plugin list.")
for plugin in plugins:
try:
(module, x, classname) = plugin.rpartition('.')
if module == '':
raise Exception()
mod = importlib.import_module(module)
klass = getattr(mod, classname)
new_plugin = klass()
plugin_name = new_plugin.get_name()
self.registered_plugins[plugin_name] = new_plugin
except Exception as ex:
self.logger.error("Could not enable plugin %s - %s" %
(plugin, str(ex)))
if len(self.registered_plugins) == 0:
self.logger.error("Could not enable at least one plugin")
raise Exception("Could not enable at least one plugin")
def ingest_data(self,
plugin_name='',
design_state=None,
design_id=None,
context=None,
**kwargs):
"""
ingest_data - Execute a data ingestion using the named plugin (assuming it is enabled)
:param plugin_name: - Which plugin should be used for ingestion
:param design_state: - An instance of statemgmt.DesignState
:param design_id: - The ID of the SiteDesign all parsed designed parts should be added
:param context: - Context of the request requesting ingestion
:param kwargs: - Keywork arguments to pass to the ingester plugin
"""
if design_state is None:
self.logger.error(
"Ingester:ingest_data called without valid DesignState handler"
)
raise ValueError("Invalid design_state handler")
# If no design_id is specified, instantiate a new one
if 'design_id' is None:
self.logger.error(
"Ingester:ingest_data required kwarg 'design_id' missing")
raise ValueError(
"Ingester:ingest_data required kwarg 'design_id' missing")
design_data = design_state.get_design(design_id)
self.logger.debug(
"Ingester:ingest_data ingesting design parts for design %s" %
design_id)
if plugin_name in self.registered_plugins:
try:
design_items = self.registered_plugins[
plugin_name].ingest_data(**kwargs)
except ValueError as vex:
self.logger.warn(
"Ingester:ingest_data - Error process data - %s" %
(str(vex)))
return None
self.logger.debug("Ingester:ingest_data parsed %s design parts" %
str(len(design_items)))
for m in design_items:
if context is not None:
m.set_create_fields(context)
if type(m) is site.Site:
design_data.set_site(m)
elif type(m) is network.Network:
design_data.add_network(m)
elif type(m) is network.NetworkLink:
design_data.add_network_link(m)
elif type(m) is hostprofile.HostProfile:
design_data.add_host_profile(m)
elif type(m) is hwprofile.HardwareProfile:
design_data.add_hardware_profile(m)
elif type(m) is node.BaremetalNode:
design_data.add_baremetal_node(m)
elif type(m) is prom.PromenadeConfig:
design_data.add_promenade_config(m)
elif type(m) is rack.Rack:
design_data.add_rack(m)
design_state.put_design(design_data)
return design_items
else:
self.logger.error("Could not find plugin %s to ingest data." %
(plugin_name))
raise LookupError("Could not find plugin %s" % plugin_name)
"""Ingest host topologies to define site design."""

View File

@ -0,0 +1,122 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Ingest host topologies to define site design."""
import logging
import importlib
from drydock_provisioner import error as errors
import drydock_provisioner.objects as objects
import drydock_provisioner.objects.site as site
import drydock_provisioner.objects.network as network
import drydock_provisioner.objects.hwprofile as hwprofile
import drydock_provisioner.objects.node as node
import drydock_provisioner.objects.hostprofile as hostprofile
import drydock_provisioner.objects.promenade as prom
import drydock_provisioner.objects.rack as rack
class Ingester(object):
def __init__(self):
self.logger = logging.getLogger("drydock.ingester")
self.registered_plugin = None
def enable_plugin(self, plugin):
"""Enable a ingester plugin for use parsing design documents.
:params plugin: - A string naming a class object denoting the ingester plugin to be enabled
"""
if plugin is None or plugin == '':
self.log.error("Cannot have an empty plugin string.")
try:
(module, x, classname) = plugin.rpartition('.')
if module == '':
raise Exception()
mod = importlib.import_module(module)
klass = getattr(mod, classname)
self.registered_plugin = klass()
except Exception as ex:
self.logger.error("Could not enable plugin %s - %s" % (plugin,
str(ex)))
if self.registered_plugin is None:
self.logger.error("Could not enable at least one plugin")
raise Exception("Could not enable at least one plugin")
def ingest_data(self,
design_state=None,
design_ref=None,
context=None,
**kwargs):
"""Execute a data ingestion of the design reference.
Return a tuple of the processing status and a instance of objects.SiteDesign
populated with all the processed design parts.
:param design_state: - An instance of statemgmt.state.DrydockState
:param design_ref: - The design reference to source design data from
:param context: - Context of the request requesting ingestion
:param kwargs: - Keywork arguments to pass to the ingester plugin
"""
if design_state is None:
self.logger.error(
"Ingester:ingest_data called without valid DrydockState handler"
)
raise ValueError("Invalid design_state handler")
# If no design_id is specified, instantiate a new one
if 'design_ref' is None:
self.logger.error(
"Ingester:ingest_data required kwarg 'design_ref' missing")
raise ValueError(
"Ingester:ingest_data required kwarg 'design_ref' missing")
design_blob = design_state.get_design_documents(design_ref)
self.logger.debug(
"Ingester:ingest_data ingesting design parts for design %s" %
design_ref)
try:
status, design_items = self.registered_plugin.ingest_data(
content=design_blob, **kwargs)
except errors.IngesterError as vex:
self.logger.warn(
"Ingester:ingest_data - Unexpected error processing data - %s"
% (str(vex)))
return None, None
self.logger.debug("Ingester:ingest_data parsed %s design parts" % str(
len(design_items)))
design_data = objects.SiteDesign()
for m in design_items:
if context is not None:
m.set_create_fields(context)
if type(m) is site.Site:
design_data.set_site(m)
elif type(m) is network.Network:
design_data.add_network(m)
elif type(m) is network.NetworkLink:
design_data.add_network_link(m)
elif type(m) is hostprofile.HostProfile:
design_data.add_host_profile(m)
elif type(m) is hwprofile.HardwareProfile:
design_data.add_hardware_profile(m)
elif type(m) is node.BaremetalNode:
design_data.add_baremetal_node(m)
elif type(m) is prom.PromenadeConfig:
design_data.add_promenade_config(m)
elif type(m) is rack.Rack:
design_data.add_rack(m)
return status, design_data

File diff suppressed because it is too large Load Diff

View File

@ -11,10 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Models for drydock_provisioner
#
import logging
"""Object models for Drydock design and task data."""
import importlib
from copy import deepcopy

View File

@ -26,10 +26,10 @@ class OrchestratorAction(BaseDrydockEnum):
ValidateDesign = 'validate_design'
VerifySite = 'verify_site'
PrepareSite = 'prepare_site'
VerifyNode = 'verify_node'
PrepareNode = 'prepare_node'
DeployNode = 'deploy_node'
DestroyNode = 'destroy_node'
VerifyNodes = 'verify_nodes'
PrepareNodes = 'prepare_nodes'
DeployNodes = 'deploy_nodes'
DestroyNodes = 'destroy_nodes'
# OOB driver actions
ValidateOobServices = 'validate_oob_services'
@ -62,8 +62,8 @@ class OrchestratorAction(BaseDrydockEnum):
ConfigurePortProvisioning = 'config_port_provisioning'
ConfigurePortProduction = 'config_port_production'
ALL = (Noop, ValidateDesign, VerifySite, PrepareSite, VerifyNode,
PrepareNode, DeployNode, DestroyNode, ConfigNodePxe, SetNodeBoot,
ALL = (Noop, ValidateDesign, VerifySite, PrepareSite, VerifyNodes,
PrepareNodes, DeployNodes, DestroyNodes, ConfigNodePxe, SetNodeBoot,
PowerOffNode, PowerOnNode, PowerCycleNode, InterrogateOob,
CreateNetworkTemplate, CreateStorageTemplate, CreateBootMedia,
PrepareHardwareConfig, ConfigureHardware, InterrogateNode,

View File

@ -11,8 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from copy import deepcopy
"""Object models for HardwareProfile and constituents."""
from oslo_versionedobjects import fields as ovo_fields

View File

@ -11,16 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Models for drydock_provisioner
#
import logging
from copy import deepcopy
"""Object models for Network and Networklink."""
import oslo_versionedobjects.fields as ovo_fields
import drydock_provisioner.objects as objects
import drydock_provisioner.objects.base as base
import drydock_provisioner.objects.fields as hd_fields

View File

@ -11,12 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Object models for Promenade configs."""
from oslo_versionedobjects import fields as ovo_fields
import drydock_provisioner.objects as objects
import drydock_provisioner.objects.base as base
import drydock_provisioner.objects.fields as hd_fields
@base.DrydockObjectRegistry.register

View File

@ -11,10 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Models for drydock_provisioner
#
from copy import deepcopy
"""Object models for a Region and the combined site design."""
import uuid
import datetime

View File

@ -15,6 +15,8 @@
import uuid
import json
import time
import logging
from datetime import datetime
@ -38,6 +40,7 @@ class Task(object):
:param context: instance of DrydockRequestContext representing the request context the
task is executing under
:param statemgr: instance of AppState used to access the database for state management
:param retry: integer retry sequence
"""
def __init__(self,
@ -46,7 +49,8 @@ class Task(object):
parent_task_id=None,
node_filter=None,
context=None,
statemgr=None):
statemgr=None,
retry=0):
self.statemgr = statemgr
self.task_id = uuid.uuid4()
@ -55,6 +59,7 @@ class Task(object):
self.result = TaskStatus()
self.action = action or hd_fields.OrchestratorAction.Noop
self.design_ref = design_ref
self.retry = retry
self.parent_task_id = parent_task_id
self.created = datetime.utcnow()
self.node_filter = node_filter
@ -63,6 +68,8 @@ class Task(object):
self.terminated = None
self.terminated_by = None
self.request_context = context
self.terminate = False
self.logger = logging.getLogger("drydock")
if context is not None:
self.created_by = context.user
@ -74,8 +81,48 @@ class Task(object):
def get_id(self):
return self.task_id
def terminate_task(self):
self.set_Status(hd_fields.TaskStatus.Terminating)
def retry_task(self, max_attempts=None):
"""Check if this task should be retried and update attributes if so."""
if (self.result.status != hd_fields.ActionResult.Success) and (len(
self.result.failures) > 0):
if not max_attempts or (max_attempts
and self.retry < max_attempts):
self.add_status_msg(
msg="Retrying task for failed entities.",
error=False,
ctx='NA',
ctx_type='NA')
self.retry = self.retry + 1
if len(self.result.successes) > 0:
self.result.status = hd_fields.ActionResult.Success
else:
self.result.status = hd_fields.ActionResult.Incomplete
self.save()
return True
else:
self.add_status_msg(
msg="Retry requested, out of attempts.",
error=False,
ctx='NA',
ctx_type='NA')
raise errors.MaxRetriesReached("Retries reached max attempts.")
else:
return False
def terminate_task(self, terminated_by=None):
"""Terminate this task.
If the task is queued, just mark it terminated. Otherwise mark it as
terminating and let the orchestrator manage completing termination.
"""
self.terminate = True
self.terminated = datetime.utcnow()
self.terminated_by = terminated_by
self.save()
def check_terminate(self):
"""Check if execution of this task should terminate."""
return self.terminate
def set_status(self, status):
self.status = status
@ -84,23 +131,44 @@ class Task(object):
return self.status
def get_result(self):
return self.result
return self.result.status
def success(self):
"""Encounter a result that causes at least partial success."""
if self.result.status in [hd_fields.TaskResult.Failure,
hd_fields.TaskResult.PartialSuccess]:
self.result.status = hd_fields.TaskResult.PartialSuccess
else:
self.result.status = hd_fields.TaskResult.Success
def success(self, focus=None):
"""Encounter a result that causes at least partial success.
def failure(self):
"""Encounter a result that causes at least partial failure."""
if self.result.status in [hd_fields.TaskResult.Success,
hd_fields.TaskResult.PartialSuccess]:
self.result.status = hd_fields.TaskResult.PartialSuccess
If defined, focus will be added to the task successes list
:param focus: The entity successfully operated upon
"""
if self.result.status in [
hd_fields.ActionResult.Failure,
hd_fields.ActionResult.PartialSuccess
]:
self.result.status = hd_fields.ActionResult.PartialSuccess
else:
self.result.status = hd_fields.TaskResult.Failure
self.result.status = hd_fields.ActionResult.Success
if focus:
self.logger.debug("Adding %s to successes list." % focus)
self.result.add_success(focus)
return
def failure(self, focus=None):
"""Encounter a result that causes at least partial failure.
If defined, focus will be added to the task failures list
:param focus: The entity successfully operated upon
"""
if self.result.status in [
hd_fields.ActionResult.Success,
hd_fields.ActionResult.PartialSuccess
]:
self.result.status = hd_fields.ActionResult.PartialSuccess
else:
self.result.status = hd_fields.ActionResult.Failure
if focus:
self.logger.debug("Adding %s to failures list." % focus)
self.result.add_failure(focus)
def register_subtask(self, subtask):
"""Register a task as a subtask to this task.
@ -111,6 +179,12 @@ class Task(object):
raise errors.OrchestratorError("Cannot add subtask for parent"
" marked for termination")
if self.statemgr.add_subtask(self.task_id, subtask.task_id):
self.add_status_msg(
msg="Started subtask %s for action %s" %
(str(subtask.get_id()), subtask.action),
error=False,
ctx=str(self.get_id()),
ctx_type='task')
self.subtask_id_list.append(subtask.task_id)
subtask.parent_task_id = self.task_id
subtask.save()
@ -119,16 +193,131 @@ class Task(object):
def save(self):
"""Save this task's current state to the database."""
chk_task = self.statemgr.get_task(self.get_id())
if chk_task in [
hd_fields.TaskStatus.Terminating,
hd_fields.TaskStatus.Terminated
]:
self.set_status(chk_task.status)
self.updated = datetime.utcnow()
if not self.statemgr.put_task(self):
raise errors.OrchestratorError("Error saving task.")
def get_subtasks(self):
"""Get list of this task's subtasks."""
return self.subtask_id_list
def collect_subtasks(self, action=None, poll_interval=15, timeout=300):
"""Monitor subtasks waiting for completion.
If action is specified, only watch subtasks executing this action. poll_interval
and timeout are measured in seconds and used for controlling the monitoring behavior.
:param action: What subtask action to monitor
:param poll_interval: How often to load subtask status from the database
:param timeout: How long to continue monitoring before considering subtasks as hung
"""
timeleft = timeout
while timeleft > 0:
st_list = self.statemgr.get_active_subtasks(self.task_id)
if len(st_list) == 0:
return True
else:
time.sleep(poll_interval)
timeleft = timeleft - poll_interval
raise errors.CollectTaskTimeout(
"Timed out collecting subtasks for task %s." % str(self.task_id))
def node_filter_from_successes(self):
"""Create a node filter from successful entities in this task's results."""
nf = dict(filter_set_type='intersection', filter_set=[])
nf['filter_set'].append(
dict(node_names=self.result.successes, filter_type='union'))
def node_filter_from_failures(self):
"""Create a node filter from failure entities in this task's result."""
nf = dict(filter_set_type='intersection', filter_set=[])
nf['filter_set'].append(
dict(node_names=self.result.failures, filter_type='union'))
def bubble_results(self, action_filter=None):
"""Combine successes and failures of subtasks and update this task with the result.
Query all completed subtasks of this task and collect the success and failure entities
from the subtask result. If action_filter is specified, collect results only from
subtasks performing the given action. Replace this task's result failures and successes
with the results of the query. If this task has a ``retry`` sequence greater than 0,
collect failures from subtasks only with an equivalent retry sequence.
:param action_filter: string action name to filter subtasks on
"""
self.logger.debug(
"Bubbling subtask results up to task %s." % str(self.task_id))
self.result.successes = []
self.result.failures = []
for st in self.statemgr.get_complete_subtasks(self.task_id):
if action_filter is None or (action_filter is not None
and st.action == action_filter):
for se in st.result.successes:
self.logger.debug(
"Bubbling subtask success for entity %s." % se)
self.result.add_success(se)
if self.retry == 0 or (self.retry == st.retry):
for fe in st.result.failures:
self.logger.debug(
"Bubbling subtask failure for entity %s." % fe)
self.result.add_failure(fe)
else:
self.logger.debug(
"Skipping failures as they mismatch task retry sequence."
)
else:
self.logger.debug("Skipping subtask due to action filter.")
def align_result(self):
"""Align the result of this task with the combined results of all the subtasks."""
for st in self.statemgr.get_complete_subtasks(self.task_id):
if st.get_result() in [
hd_fields.ActionResult.Success,
hd_fields.ActionResult.PartialSuccess
]:
self.success()
if st.get_result() in [
hd_fields.ActionResult.Failure,
hd_fields.ActionResult.PartialSuccess
]:
self.failure()
def add_status_msg(self, **kwargs):
"""Add a status message to this task's result status."""
msg = self.result.add_status_msg(**kwargs)
self.statemgr.post_result_message(self.task_id, msg)
def merge_status_messages(self, task=None, task_result=None):
"""Merge status messages into this task's result status.
Specify either task or task_result to source status messages from.
:param task: instance of objects.task.Task to consume result messages from
:param task_result: instance of objects.task.TaskStatus to consume result message from
"""
if task:
msg_list = task.result.message_list
elif task_result:
msg_list = task_result.message_list
for m in msg_list:
self.add_status_msg(
msg=m.msg,
error=m.error,
ctx_type=m.ctx_type,
ctx=m.ctx,
ts=m.ts,
**m.extra)
def to_db(self, include_id=True):
"""Convert this instance to a dictionary for use persisting to a db.
@ -150,6 +339,10 @@ class Task(object):
self.result.reason,
'result_error_count':
self.result.error_count,
'result_successes':
self.result.successes,
'result_failures':
self.result.failures,
'status':
self.status,
'created':
@ -169,6 +362,10 @@ class Task(object):
self.terminated,
'terminated_by':
self.terminated_by,
'terminate':
self.terminate,
'retry':
self.retry
}
if include_id:
@ -182,21 +379,39 @@ class Task(object):
Intended for use in JSON serialization
"""
return {
'Kind': 'Task',
'apiVersion': 'v1',
'task_id': str(self.task_id),
'action': self.action,
'parent_task_id': str(self.parent_task_id),
'design_ref': self.design_ref,
'status': self.status,
'result': self.result.to_dict(),
'node_filter': self.node_filter.to_dict(),
'kind':
'Task',
'apiVersion':
'v1',
'task_id':
str(self.task_id),
'action':
self.action,
'parent_task_id':
None if self.parent_task_id is None else str(self.parent_task_id),
'design_ref':
self.design_ref,
'status':
self.status,
'result':
self.result.to_dict(),
'node_filter':
None if self.node_filter is None else self.node_filter,
'subtask_id_list': [str(x) for x in self.subtask_id_list],
'created': self.created,
'created_by': self.created_by,
'updated': self.updated,
'terminated': self.terminated,
'terminated_by': self.terminated_by,
'created':
None if self.created is None else str(self.created),
'created_by':
self.created_by,
'updated':
None if self.updated is None else str(self.updated),
'terminated':
None if self.terminated is None else str(self.terminated),
'terminated_by':
self.terminated_by,
'terminate':
self.terminate,
'retry':
self.retry,
}
@classmethod
@ -207,23 +422,40 @@ class Task(object):
"""
i = Task()
i.task_id = uuid.UUID(bytes=d.get('task_id'))
i.task_id = uuid.UUID(bytes=bytes(d.get('task_id')))
if d.get('parent_task_id', None) is not None:
i.parent_task_id = uuid.UUID(bytes=d.get('parent_task_id'))
i.parent_task_id = uuid.UUID(bytes=bytes(d.get('parent_task_id')))
if d.get('subtask_id_list', None) is not None:
for t in d.get('subtask_id_list'):
i.subtask_id_list.append(uuid.UUID(bytes=t))
i.subtask_id_list.append(uuid.UUID(bytes=bytes(t)))
simple_fields = [
'status', 'created', 'created_by', 'design_ref', 'action',
'terminated', 'terminated_by'
'status',
'created',
'created_by',
'design_ref',
'action',
'terminated',
'terminated_by',
'terminate',
'updated',
'retry',
]
for f in simple_fields:
setattr(i, f, d.get(f, None))
# Recreate result
i.result = TaskStatus()
i.result.error_count = d.get('result_error_count')
i.result.message = d.get('result_message')
i.result.reason = d.get('result_reason')
i.result.status = d.get('result_status')
i.result.successes = d.get('result_successes', [])
i.result.failures = d.get('result_failures', [])
# Deserialize the request context for this task
if i.request_context is not None:
i.request_context = DrydockRequestContext.from_dict(
@ -243,6 +475,11 @@ class TaskStatus(object):
self.reason = None
self.status = hd_fields.ActionResult.Incomplete
# For tasks operating on multiple contexts (nodes, networks, etc...)
# track which contexts ended successfully and which failed
self.successes = []
self.failures = []
@classmethod
def obj_name(cls):
return cls.__name__
@ -256,6 +493,22 @@ class TaskStatus(object):
def set_status(self, status):
self.status = status
def add_failure(self, entity):
"""Add an entity to the failures list.
:param entity: String entity name to add
"""
if entity not in self.failures:
self.failures.append(entity)
def add_success(self, entity):
"""Add an entity to the successes list.
:param entity: String entity name to add
"""
if entity not in self.successes:
self.successes.append(entity)
def add_status_msg(self,
msg=None,
error=None,
@ -277,12 +530,14 @@ class TaskStatus(object):
def to_dict(self):
return {
'Kind': 'Status',
'kind': 'Status',
'apiVersion': 'v1',
'metadata': {},
'message': self.message,
'reason': self.reason,
'status': self.status,
'successes': self.successes,
'failures': self.failures,
'details': {
'errorCount': self.error_count,
'messageList': [x.to_dict() for x in self.message_list],
@ -298,7 +553,10 @@ class TaskStatusMessage(object):
self.error = error
self.ctx_type = ctx_type
self.ctx = ctx
self.ts = datetime.utcnow()
if 'ts' not in kwargs:
self.ts = datetime.utcnow()
else:
self.ts = kwargs.pop('ts')
self.extra = kwargs
@classmethod
@ -306,21 +564,20 @@ class TaskStatusMessage(object):
return cls.__name__
def to_dict(self):
"""Convert to a dictionary in prep for JSON/YAML serialization."""
_dict = {
'message': self.message,
'error': self.error,
'context_type': self.ctx_type,
'context': self.ctx,
'ts': self.ts,
'ts': str(self.ts),
'extra': self.extra,
}
_dict.update(self.extra)
return _dict
def to_db(self):
"""Convert this instance to a dictionary appropriate for the DB."""
return {
_dict = {
'message': self.message,
'error': self.error,
'context': self.ctx,
@ -329,6 +586,11 @@ class TaskStatusMessage(object):
'extra': json.dumps(self.extra),
}
if len(_dict['message']) > 128:
_dict['message'] = _dict['message'][:127]
return _dict
@classmethod
def from_db(cls, d):
"""Create instance from DB-based dictionary.
@ -337,8 +599,7 @@ class TaskStatusMessage(object):
"""
i = TaskStatusMessage(
d.get('message', None),
d.get('error'),
d.get('context_type'), d.get('context'))
d.get('error'), d.get('context_type'), d.get('context'))
if 'extra' in d:
i.extra = d.get('extra')
i.ts = d.get('ts', None)

View File

@ -11,829 +11,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import importlib
import logging
from oslo_config import cfg
import drydock_provisioner.drivers as drivers
import drydock_provisioner.objects.task as tasks
import drydock_provisioner.error as errors
import drydock_provisioner.objects.fields as hd_fields
class Orchestrator(object):
# enabled_drivers is a map which provider drivers
# should be enabled for use by this orchestrator
def __init__(self, enabled_drivers=None, state_manager=None):
self.enabled_drivers = {}
self.state_manager = state_manager
self.logger = logging.getLogger('drydock.orchestrator')
if enabled_drivers is not None:
oob_drivers = enabled_drivers.oob_driver
# This is because oslo_config changes the option value
# for multiopt depending on if multiple values are actually defined
for d in oob_drivers:
self.logger.info("Enabling OOB driver %s" % d)
if d is not None:
m, c = d.rsplit('.', 1)
oob_driver_class = \
getattr(importlib.import_module(m), c, None)
if oob_driver_class is not None:
if self.enabled_drivers.get('oob', None) is None:
self.enabled_drivers['oob'] = []
self.enabled_drivers['oob'].append(
oob_driver_class(
state_manager=state_manager,
orchestrator=self))
node_driver_name = enabled_drivers.node_driver
if node_driver_name is not None:
m, c = node_driver_name.rsplit('.', 1)
node_driver_class = \
getattr(importlib.import_module(m), c, None)
if node_driver_class is not None:
self.enabled_drivers['node'] = node_driver_class(
state_manager=state_manager, orchestrator=self)
network_driver_name = enabled_drivers.network_driver
if network_driver_name is not None:
m, c = network_driver_name.rsplit('.', 1)
network_driver_class = \
getattr(importlib.import_module(m), c, None)
if network_driver_class is not None:
self.enabled_drivers['network'] = network_driver_class(
state_manager=state_manager, orchestrator=self)
"""
execute_task
This is the core of the orchestrator. The task will describe the action
to take and the context/scope of the command. We will then source
the current designed state and current built state from the statemgmt
module. Based on those 3 inputs, we'll decide what is needed next.
"""
def execute_task(self, task_id):
if self.state_manager is None:
raise errors.OrchestratorError("Cannot execute task without"
" initialized state manager")
task = self.state_manager.get_task(task_id)
if task is None:
raise errors.OrchestratorError("Task %s not found." % (task_id))
design_id = task.design_id
# Just for testing now, need to implement with enabled_drivers
# logic
if task.action == hd_fields.OrchestratorAction.Noop:
self.task_field_update(
task_id, status=hd_fields.TaskStatus.Running)
driver_task = self.create_task(
tasks.DriverTask,
design_id=0,
action=hd_fields.OrchestratorAction.Noop,
parent_task_id=task.get_id())
driver = drivers.ProviderDriver(
state_manager=self.state_manager, orchestrator=self)
driver.execute_task(driver_task.get_id())
driver_task = self.state_manager.get_task(driver_task.get_id())
self.task_field_update(task_id, status=driver_task.get_status())
return
elif task.action == hd_fields.OrchestratorAction.ValidateDesign:
self.task_field_update(
task_id, status=hd_fields.TaskStatus.Running)
try:
site_design = self.get_effective_site(design_id)
self.task_field_update(
task_id, result=hd_fields.ActionResult.Success)
except Exception:
self.task_field_update(
task_id, result=hd_fields.ActionResult.Failure)
self.task_field_update(
task_id, status=hd_fields.TaskStatus.Complete)
return
elif task.action == hd_fields.OrchestratorAction.VerifySite:
self.task_field_update(
task_id, status=hd_fields.TaskStatus.Running)
node_driver = self.enabled_drivers['node']
if node_driver is not None:
node_driver_task = self.create_task(
tasks.DriverTask,
parent_task_id=task.get_id(),
design_id=design_id,
action=hd_fields.OrchestratorAction.ValidateNodeServices)
node_driver.execute_task(node_driver_task.get_id())
node_driver_task = self.state_manager.get_task(
node_driver_task.get_id())
self.task_field_update(
task_id,
status=hd_fields.TaskStatus.Complete,
result=node_driver_task.get_result())
return
elif task.action == hd_fields.OrchestratorAction.PrepareSite:
driver = self.enabled_drivers['node']
if driver is None:
self.task_field_update(
task_id,
status=hd_fields.TaskStatus.Errored,
result=hd_fields.ActionResult.Failure)
return
worked = failed = False
site_network_task = self.create_task(
tasks.DriverTask,
parent_task_id=task.get_id(),
design_id=design_id,
action=hd_fields.OrchestratorAction.CreateNetworkTemplate)
self.logger.info(
"Starting node driver task %s to create network templates" %
(site_network_task.get_id()))
driver.execute_task(site_network_task.get_id())
site_network_task = self.state_manager.get_task(
site_network_task.get_id())
if site_network_task.get_result() in [
hd_fields.ActionResult.Success,
hd_fields.ActionResult.PartialSuccess
]:
worked = True
if site_network_task.get_result() in [
hd_fields.ActionResult.Failure,
hd_fields.ActionResult.PartialSuccess
]:
failed = True
self.logger.info("Node driver task %s complete" %
(site_network_task.get_id()))
user_creds_task = self.create_task(
tasks.DriverTask,
parent_task_id=task.get_id(),
design_id=design_id,
action=hd_fields.OrchestratorAction.ConfigureUserCredentials)
self.logger.info(
"Starting node driver task %s to configure user credentials" %
(user_creds_task.get_id()))
driver.execute_task(user_creds_task.get_id())
self.logger.info("Node driver task %s complete" %
(site_network_task.get_id()))
user_creds_task = self.state_manager.get_task(
site_network_task.get_id())
if user_creds_task.get_result() in [
hd_fields.ActionResult.Success,
hd_fields.ActionResult.PartialSuccess
]:
worked = True
if user_creds_task.get_result() in [
hd_fields.ActionResult.Failure,
hd_fields.ActionResult.PartialSuccess
]:
failed = True
if worked and failed:
final_result = hd_fields.ActionResult.PartialSuccess
elif worked:
final_result = hd_fields.ActionResult.Success
else:
final_result = hd_fields.ActionResult.Failure
self.task_field_update(
task_id,
status=hd_fields.TaskStatus.Complete,
result=final_result)
return
elif task.action == hd_fields.OrchestratorAction.VerifyNode:
self.task_field_update(
task_id, status=hd_fields.TaskStatus.Running)
site_design = self.get_effective_site(design_id)
node_filter = task.node_filter
oob_type_partition = {}
target_nodes = self.process_node_filter(node_filter, site_design)
for n in target_nodes:
if n.oob_type not in oob_type_partition.keys():
oob_type_partition[n.oob_type] = []
oob_type_partition[n.oob_type].append(n)
result_detail = {'detail': []}
worked = failed = False
# TODO(sh8121att) Need to multithread tasks for different OOB types
for oob_type, oob_nodes in oob_type_partition.items():
oob_driver = None
for d in self.enabled_drivers['oob']:
if d.oob_type_support(oob_type):
oob_driver = d
break
if oob_driver is None:
self.logger.warning(
"Node OOB type %s has no enabled driver." % oob_type)
result_detail['detail'].append(
"Error: No oob driver configured for type %s" %
oob_type)
continue
target_names = [x.get_name() for x in oob_nodes]
task_scope = {'node_names': target_names}
oob_driver_task = self.create_task(
tasks.DriverTask,
parent_task_id=task.get_id(),
design_id=design_id,
action=hd_fields.OrchestratorAction.InterrogateOob,
task_scope=task_scope)
self.logger.info(
"Starting task %s for node verification via OOB type %s" %
(oob_driver_task.get_id(), oob_type))
oob_driver.execute_task(oob_driver_task.get_id())
oob_driver_task = self.state_manager.get_task(
oob_driver_task.get_id())
if oob_driver_task.get_result() in [
hd_fields.ActionResult.Success,
hd_fields.ActionResult.PartialSuccess
]:
worked = True
if oob_driver_task.get_result() in [
hd_fields.ActionResult.Failure,
hd_fields.ActionResult.PartialSuccess
]:
failed = True
final_result = None
if worked and failed:
final_result = hd_fields.ActionResult.PartialSuccess
elif worked:
final_result = hd_fields.ActionResult.Success
else:
final_result = hd_fields.ActionResult.Failure
self.task_field_update(
task_id,
status=hd_fields.TaskStatus.Complete,
result=final_result,
result_detail=result_detail)
return
elif task.action == hd_fields.OrchestratorAction.PrepareNode:
failed = worked = False
self.task_field_update(
task_id, status=hd_fields.TaskStatus.Running)
# NOTE Should we attempt to interrogate the node via Node
# Driver to see if it is in a deployed state before we
# start rebooting? Or do we just leverage
# Drydock internal state via site build data (when implemented)?
node_driver = self.enabled_drivers['node']
if node_driver is None:
self.task_field_update(
task_id,
status=hd_fields.TaskStatus.Errored,
result=hd_fields.ActionResult.Failure,
result_detail={
'detail': 'Error: No node driver configured',
'retry': False
})
return
site_design = self.get_effective_site(design_id)
node_filter = task.node_filter
target_nodes = self.process_node_filter(node_filter, site_design)
oob_type_partition = {}
for n in target_nodes:
if n.oob_type not in oob_type_partition.keys():
oob_type_partition[n.oob_type] = []
oob_type_partition[n.oob_type].append(n)
result_detail = {'detail': []}
worked = failed = False
# TODO(sh8121att) Need to multithread tasks for different OOB types
for oob_type, oob_nodes in oob_type_partition.items():
oob_driver = None
for d in self.enabled_drivers['oob']:
if d.oob_type_support(oob_type):
oob_driver = d
break
if oob_driver is None:
self.logger.warning(
"Node OOB type %s has no enabled driver." % oob_type)
result_detail['detail'].append(
"Error: No oob driver configured for type %s" %
oob_type)
continue
target_names = [x.get_name() for x in oob_nodes]
task_scope = {'node_names': target_names}
setboot_task = self.create_task(
tasks.DriverTask,
parent_task_id=task.get_id(),
design_id=design_id,
action=hd_fields.OrchestratorAction.SetNodeBoot,
task_scope=task_scope)
self.logger.info(
"Starting OOB driver task %s to set PXE boot for OOB type %s"
% (setboot_task.get_id(), oob_type))
oob_driver.execute_task(setboot_task.get_id())
self.logger.info("OOB driver task %s complete" %
(setboot_task.get_id()))
setboot_task = self.state_manager.get_task(
setboot_task.get_id())
if setboot_task.get_result() == hd_fields.ActionResult.Success:
worked = True
elif setboot_task.get_result(
) == hd_fields.ActionResult.PartialSuccess:
worked = failed = True
elif setboot_task.get_result(
) == hd_fields.ActionResult.Failure:
failed = True
cycle_task = self.create_task(
tasks.DriverTask,
parent_task_id=task.get_id(),
design_id=design_id,
action=hd_fields.OrchestratorAction.PowerCycleNode,
task_scope=task_scope)
self.logger.info(
"Starting OOB driver task %s to power cycle nodes for OOB type %s"
% (cycle_task.get_id(), oob_type))
oob_driver.execute_task(cycle_task.get_id())
self.logger.info("OOB driver task %s complete" %
(cycle_task.get_id()))
cycle_task = self.state_manager.get_task(cycle_task.get_id())
if cycle_task.get_result() == hd_fields.ActionResult.Success:
worked = True
elif cycle_task.get_result(
) == hd_fields.ActionResult.PartialSuccess:
worked = failed = True
elif cycle_task.get_result() == hd_fields.ActionResult.Failure:
failed = True
# IdentifyNode success will take some time after PowerCycleNode finishes
# Retry the operation a few times if it fails before considering it a final failure
# Each attempt is a new task which might make the final task tree a bit confusing
node_identify_attempts = 0
max_attempts = cfg.CONF.timeouts.identify_node * (
60 / cfg.CONF.poll_interval)
while True:
node_identify_task = self.create_task(
tasks.DriverTask,
parent_task_id=task.get_id(),
design_id=design_id,
action=hd_fields.OrchestratorAction.IdentifyNode,
task_scope=task_scope)
self.logger.info(
"Starting node driver task %s to identify node - attempt %s"
% (node_identify_task.get_id(),
node_identify_attempts + 1))
node_driver.execute_task(node_identify_task.get_id())
node_identify_attempts = node_identify_attempts + 1
node_identify_task = self.state_manager.get_task(
node_identify_task.get_id())
if node_identify_task.get_result(
) == hd_fields.ActionResult.Success:
worked = True
break
elif node_identify_task.get_result() in [
hd_fields.ActionResult.PartialSuccess,
hd_fields.ActionResult.Failure
]:
if node_identify_attempts > max_attempts:
failed = True
break
time.sleep(cfg.CONF.poll_interval)
# We can only commission nodes that were successfully identified in the provisioner
if len(node_identify_task.result_detail['successful_nodes']) > 0:
self.logger.info(
"Found %s successfully identified nodes, starting commissioning."
%
(len(node_identify_task.result_detail['successful_nodes'])
))
node_commission_task = self.create_task(
tasks.DriverTask,
parent_task_id=task.get_id(),
design_id=design_id,
action=hd_fields.OrchestratorAction.ConfigureHardware,
task_scope={
'node_names':
node_identify_task.result_detail['successful_nodes']
})
self.logger.info(
"Starting node driver task %s to commission nodes." %
(node_commission_task.get_id()))
node_driver.execute_task(node_commission_task.get_id())
node_commission_task = self.state_manager.get_task(
node_commission_task.get_id())
if node_commission_task.get_result() in [
hd_fields.ActionResult.Success,
hd_fields.ActionResult.PartialSuccess
]:
worked = True
elif node_commission_task.get_result() in [
hd_fields.ActionResult.Failure,
hd_fields.ActionResult.PartialSuccess
]:
failed = True
else:
self.logger.warning(
"No nodes successfully identified, skipping commissioning subtask"
)
final_result = None
if worked and failed:
final_result = hd_fields.ActionResult.PartialSuccess
elif worked:
final_result = hd_fields.ActionResult.Success
else:
final_result = hd_fields.ActionResult.Failure
self.task_field_update(
task_id,
status=hd_fields.TaskStatus.Complete,
result=final_result)
return
elif task.action == hd_fields.OrchestratorAction.DeployNode:
failed = worked = False
self.task_field_update(
task_id, status=hd_fields.TaskStatus.Running)
node_driver = self.enabled_drivers['node']
if node_driver is None:
self.task_field_update(
task_id,
status=hd_fields.TaskStatus.Errored,
result=hd_fields.ActionResult.Failure,
result_detail={
'detail': 'Error: No node driver configured',
'retry': False
})
return
site_design = self.get_effective_site(design_id)
node_filter = task.node_filter
target_nodes = self.process_node_filter(node_filter, site_design)
target_names = [x.get_name() for x in target_nodes]
task_scope = {'node_names': target_names}
node_networking_task = self.create_task(
tasks.DriverTask,
parent_task_id=task.get_id(),
design_id=design_id,
action=hd_fields.OrchestratorAction.ApplyNodeNetworking,
task_scope=task_scope)
self.logger.info(
"Starting node driver task %s to apply networking on nodes." %
(node_networking_task.get_id()))
node_driver.execute_task(node_networking_task.get_id())
node_networking_task = self.state_manager.get_task(
node_networking_task.get_id())
if node_networking_task.get_result() in [
hd_fields.ActionResult.Success,
hd_fields.ActionResult.PartialSuccess
]:
worked = True
if node_networking_task.get_result() in [
hd_fields.ActionResult.Failure,
hd_fields.ActionResult.PartialSuccess
]:
failed = True
node_storage_task = None
if len(node_networking_task.result_detail['successful_nodes']) > 0:
self.logger.info(
"Found %s successfully networked nodes, configuring storage."
% (len(node_networking_task.result_detail[
'successful_nodes'])))
node_storage_task = self.create_task(
tasks.DriverTask,
parent_Task_id=task.get_id(),
design_id=design_id,
action=hd_fields.OrchestratorAction.ApplyNodeStorage,
task_scope={
'node_names':
node_networking_task.result_detail['successful_nodes']
})
self.logger.info(
"Starting node driver task %s to configure node storage." %
(node_storage_task.get_id()))
node_driver.execute_task(node_storage_task.get_id())
node_storage_task = self.state_manager.get_task(
node_storage_task.get_id())
if node_storage_task.get_result() in [
hd_fields.ActionResult.Success,
hd_fields.ActionResult.PartialSuccess
]:
worked = True
elif node_storage_task.get_result() in [
hd_fields.ActionResult.Failure,
hd_fields.ActionResult.PartialSuccess
]:
failed = True
else:
self.logger.warning(
"No nodes successfully networked, skipping storage configuration subtask."
)
node_platform_task = None
if (node_storage_task is not None and
len(node_storage_task.result_detail['successful_nodes']) >
0):
self.logger.info(
"Configured storage on %s nodes, configuring platform." %
(len(node_storage_task.result_detail['successful_nodes'])))
node_platform_task = self.create_task(
tasks.DriverTask,
parent_task_id=task.get_id(),
design_id=design_id,
action=hd_fields.OrchestratorAction.ApplyNodePlatform,
task_scope={
'node_names':
node_storage_task.result_detail['successful_nodes']
})
self.logger.info(
"Starting node driver task %s to configure node platform."
% (node_platform_task.get_id()))
node_driver.execute_task(node_platform_task.get_id())
node_platform_task = self.state_manager.get_task(
node_platform_task.get_id())
if node_platform_task.get_result() in [
hd_fields.ActionResult.Success,
hd_fields.ActionResult.PartialSuccess
]:
worked = True
elif node_platform_task.get_result() in [
hd_fields.ActionResult.Failure,
hd_fields.ActionResult.PartialSuccess
]:
failed = True
else:
self.logger.warning(
"No nodes with storage configuration, skipping platform configuration subtask."
)
node_deploy_task = None
if node_platform_task is not None and len(
node_platform_task.result_detail['successful_nodes']) > 0:
self.logger.info(
"Configured platform on %s nodes, starting deployment." %
(len(node_platform_task.result_detail['successful_nodes'])
))
node_deploy_task = self.create_task(
tasks.DriverTask,
parent_task_id=task.get_id(),
design_id=design_id,
action=hd_fields.OrchestratorAction.DeployNode,
task_scope={
'node_names':
node_platform_task.result_detail['successful_nodes']
})
self.logger.info(
"Starting node driver task %s to deploy nodes." %
(node_deploy_task.get_id()))
node_driver.execute_task(node_deploy_task.get_id())
node_deploy_task = self.state_manager.get_task(
node_deploy_task.get_id())
if node_deploy_task.get_result() in [
hd_fields.ActionResult.Success,
hd_fields.ActionResult.PartialSuccess
]:
worked = True
elif node_deploy_task.get_result() in [
hd_fields.ActionResult.Failure,
hd_fields.ActionResult.PartialSuccess
]:
failed = True
else:
self.logger.warning(
"Unable to configure platform on any nodes, skipping deploy subtask"
)
final_result = None
if worked and failed:
final_result = hd_fields.ActionResult.PartialSuccess
elif worked:
final_result = hd_fields.ActionResult.Success
else:
final_result = hd_fields.ActionResult.Failure
self.task_field_update(
task_id,
status=hd_fields.TaskStatus.Complete,
result=final_result)
else:
raise errors.OrchestratorError("Action %s not supported" %
(task.action))
"""
terminate_task
Mark a task for termination and optionally propagate the termination
recursively to all subtasks
"""
def terminate_task(self, task_id, propagate=True):
task = self.state_manager.get_task(task_id)
if task is None:
raise errors.OrchestratorError("Could find task %s" % task_id)
else:
# Terminate initial task first to prevent add'l subtasks
self.task_field_update(task_id, terminate=True)
if propagate:
# Get subtasks list
subtasks = task.get_subtasks()
for st in subtasks:
self.terminate_task(st, propagate=True)
else:
return True
def create_task(self, task_class, **kwargs):
parent_task_id = kwargs.get('parent_task_id', None)
new_task = task_class(**kwargs)
self.state_manager.post_task(new_task)
if parent_task_id is not None:
self.task_subtask_add(parent_task_id, new_task.get_id())
return new_task
# Lock a task and make all field updates, then unlock it
def task_field_update(self, task_id, **kwargs):
lock_id = self.state_manager.lock_task(task_id)
if lock_id is not None:
task = self.state_manager.get_task(task_id)
for k, v in kwargs.items():
setattr(task, k, v)
self.state_manager.put_task(task, lock_id=lock_id)
self.state_manager.unlock_task(task_id, lock_id)
return True
else:
return False
def task_subtask_add(self, task_id, subtask_id):
lock_id = self.state_manager.lock_task(task_id)
if lock_id is not None:
task = self.state_manager.get_task(task_id)
task.register_subtask(subtask_id)
self.state_manager.put_task(task, lock_id=lock_id)
self.state_manager.unlock_task(task_id, lock_id)
return True
else:
return False
def compute_model_inheritance(self, site_design):
# For now the only thing that really incorporates inheritance is
# host profiles and baremetal nodes. So we'll just resolve it for
# the baremetal nodes which recursively resolves it for host profiles
# assigned to those nodes
for n in getattr(site_design, 'baremetal_nodes', []):
n.compile_applied_model(site_design)
return
"""
compute_model_inheritance - given a fully populated Site model,
compute the effecitve design by applying inheritance and references
return a Site model reflecting the effective design for the site
"""
def get_described_site(self, design_id):
site_design = self.state_manager.get_design(design_id)
return site_design
def get_effective_site(self, design_id):
site_design = self.get_described_site(design_id)
self.compute_model_inheritance(site_design)
return site_design
def process_node_filter(self, node_filter, site_design):
target_nodes = site_design.baremetal_nodes
if node_filter is None:
return target_nodes
node_names = node_filter.get('node_names', [])
node_racks = node_filter.get('rack_names', [])
node_tags = node_filter.get('node_tags', [])
if len(node_names) > 0:
target_nodes = [
x for x in target_nodes if x.get_name() in node_names
]
if len(node_racks) > 0:
target_nodes = [
x for x in target_nodes if x.get_rack() in node_racks
]
if len(node_tags) > 0:
target_nodes = [
x for x in target_nodes for t in node_tags if x.has_tag(t)
]
return target_nodes
"""Orchestrator for managing the deployment workflow."""

View File

@ -0,0 +1,724 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Actions for the Orchestrator level of the Drydock workflow."""
import time
import logging
import concurrent.futures
import uuid
import drydock_provisioner.config as config
import drydock_provisioner.error as errors
import drydock_provisioner.objects.fields as hd_fields
class BaseAction(object):
"""The base class for actions starts by the orchestrator."""
def __init__(self, task, orchestrator, state_manager):
"""Object initializer.
:param task: objects.Task instance this action will execute against
:param orchestrator: orchestrator.Orchestrator instance
:param state_manager: state.DrydockState instnace used to access task state
"""
self.task = task
self.orchestrator = orchestrator
self.state_manager = state_manager
self.logger = logging.getLogger(
config.config_mgr.conf.logging.global_logger_name)
def _parallelize_subtasks(self, fn, subtask_id_list, *args, **kwargs):
"""Spawn threads to execute fn for each subtask using concurrent.futures.
Return a dictionary of task_id:concurrent.futures.Future instance
:param fn: The callable to execute in a thread, expected it takes a task_id as first argument
:param subtask_id_list: List of uuid.UUID ID of the subtasks to execute on
:param *args: The args to pass to fn
:param **kwargs: The kwargs to pass to fn
"""
task_futures = dict()
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as te:
for t in subtask_id_list:
task_futures[t.bytes] = te.submit(fn, t, *args, **kwargs)
return task_futures
def _collect_subtask_futures(self, subtask_futures, timeout=300):
"""Collect Futures executing on subtasks or timeout.
Wait for Futures to finish or timeout. After timeout, enumerate the subtasks
that timed out in task result messages.
:param subtask_futures: dictionary of subtask_id.bytes -> Future instance
:param timeout: The number of seconds to wait for all Futures to complete
"""
finished, timed_out = concurrent.futures.wait(
subtask_futures.values(), timeout=timeout)
self.task.align_result()
for k, v in subtask_futures.items():
if not v.done():
self.task.add_status_msg(
"Subtask thread for %s still executing after timeout." %
str(uuid.UUID(bytes=k)),
error=True,
ctx=str(self.task.get_id()),
ctx_type='task')
self.task.failure()
else:
if v.exception():
self.logger.error(
"Uncaught excetion in subtask %s future:" % str(
uuid.UUID(bytes=k)),
exc_info=v.exception())
if len(timed_out) > 0:
raise errors.CollectSubtaskTimeout(
"One or more subtask threads did not finish in %d seconds." %
timeout)
return
def _load_site_design(self):
"""Load the site design from this action's task.
The design_ref in the task can be resolved to a set of design documents
that reflect the site design to be operated on. Load this design for use
by this action.
"""
design_status, site_design = self.orchestrator.get_effective_site(
self.task.design_ref)
if design_status is None or design_status.status == hd_fields.ActionResult.Failure:
raise errors.OrchestratorError("Site design failed load.")
return site_design
class Noop(BaseAction):
"""Dummy action to allow the full task completion flow without impacts."""
def start(self):
"""Start executing this action."""
self.logger.debug("Starting Noop Action.")
self.task.set_status(hd_fields.TaskStatus.Running)
self.task.save()
time.sleep(5)
self.task = self.state_manager.get_task(self.task.get_id())
if self.task.check_terminate():
self.logger.debug("Terminating action.")
self.task.set_status(hd_fields.TaskStatus.Terminated)
self.task.failure()
self.task.add_status_msg(
msg="Action terminated.", ctx_type='NA', ctx='NA', error=False)
else:
self.logger.debug("Marked task as successful.")
self.task.set_status(hd_fields.TaskStatus.Complete)
self.task.success()
self.task.add_status_msg(
msg="Noop action.", ctx_type='NA', ctx='NA', error=False)
self.task.save()
self.logger.debug("Saved task state.")
self.logger.debug("Finished Noop Action.")
return
class DestroyNodes(BaseAction):
"""Action to destroy nodes in prepartion for a redeploy."""
def start(self):
"""Start executing this action."""
self.task.set_status(hd_fields.TaskStatus.Complete)
self.task.failure()
self.task.save()
return
class ValidateDesign(BaseAction):
"""Action for validating the design document referenced by the task."""
def start(self):
"""Start executing this action."""
self.task.set_status(hd_fields.TaskStatus.Running)
self.task.save()
try:
status, site_design = self.orchestrator.get_effective_site(
self.task.design_ref)
self.task.merge_status_messages(task_result=status)
self.task.set_status(hd_fields.TaskStatus.Complete)
self.task.success()
self.task.save()
except Exception:
self.task.set_status(hd_fields.TaskStatus.Complete)
self.task.failure()
self.task.save()
return
class VerifySite(BaseAction):
"""Action to verify downstream tools in the site are available and ready."""
def start(self):
"""Start executing this action in the context of the local task."""
self.task.set_status(hd_fields.TaskStatus.Running)
self.task.save()
node_driver = self.orchestrator.enabled_drivers['node']
if node_driver is None:
self.task.set_status(hd_fields.TaskStatus.Complete)
self.task.add_status_msg(
msg="No node driver enabled, ending task.",
error=True,
ctx=str(self.task.get_id()),
ctx_type='task')
self.task.result.set_message("No NodeDriver enabled.")
self.task.result.set_reason("Bad Configuration.")
self.task.failure()
self.task.save()
return
node_driver_task = self.orchestrator.create_task(
design_ref=self.task.design_ref,
action=hd_fields.OrchestratorAction.ValidateNodeServices)
self.task.register_subtask(node_driver_task)
node_driver.execute_task(node_driver_task.get_id())
node_driver_task = self.state_manager.get_task(
node_driver_task.get_id())
self.task.add_status_msg(
msg="Collected subtask %s" % str(node_driver_task.get_id()),
error=False,
ctx=str(node_driver_task.get_id()),
ctx_type='task')
self.task = self.state_manager.get_task(self.task.get_id())
self.task.set_status(hd_fields.TaskStatus.Complete)
self.task.align_result()
self.task.save()
return
class PrepareSite(BaseAction):
"""Action to configure site wide/inter-node settings."""
def start(self):
"""Start executing this action in the context of the local task."""
self.task.set_status(hd_fields.TaskStatus.Running)
self.task.save()
driver = self.orchestrator.enabled_drivers['node']
if driver is None:
self.task.set_status(hd_fields.TaskStatus.Complete)
self.task.add_status_msg(
msg="No node driver enabled, ending task.",
error=True,
ctx=str(self.task.get_id()),
ctx_type='task')
self.task.result.set_message("No NodeDriver enabled.")
self.task.result.set_reason("Bad Configuration.")
self.task.failure()
self.task.save()
return
site_network_task = self.orchestrator.create_task(
design_ref=self.task.design_ref,
action=hd_fields.OrchestratorAction.CreateNetworkTemplate)
self.task.register_subtask(site_network_task)
self.logger.info(
"Starting node driver task %s to create network templates" %
(site_network_task.get_id()))
driver.execute_task(site_network_task.get_id())
self.task.add_status_msg(
msg="Collected subtask %s" % str(site_network_task.get_id()),
error=False,
ctx=str(site_network_task.get_id()),
ctx_type='task')
self.logger.info("Node driver task %s complete" %
(site_network_task.get_id()))
user_creds_task = self.orchestrator.create_task(
design_ref=self.task.design_ref,
action=hd_fields.OrchestratorAction.ConfigureUserCredentials)
self.task.register_subtask(user_creds_task)
self.logger.info(
"Starting node driver task %s to configure user credentials" %
(user_creds_task.get_id()))
driver.execute_task(user_creds_task.get_id())
self.task.add_status_msg(
msg="Collected subtask %s" % str(user_creds_task.get_id()),
error=False,
ctx=str(user_creds_task.get_id()),
ctx_type='task')
self.logger.info("Node driver task %s complete" %
(site_network_task.get_id()))
self.task.align_result()
self.task.set_status(hd_fields.TaskStatus.Complete)
self.task.save()
return
class VerifyNodes(BaseAction):
"""Action to verify the orchestrator has adequate access to a node to start the deployment."""
def start(self):
"""Start executing this action."""
self.task.set_status(hd_fields.TaskStatus.Running)
self.task.save()
design_status, site_design = self.orchestrator.get_effective_site(
self.task.design_ref)
node_filter = self.task.node_filter
oob_type_partition = {}
target_nodes = self.orchestrator.process_node_filter(
node_filter, site_design)
for n in target_nodes:
if n.oob_type not in oob_type_partition.keys():
oob_type_partition[n.oob_type] = []
oob_type_partition[n.oob_type].append(n)
task_futures = dict()
for oob_type, oob_nodes in oob_type_partition.items():
oob_driver = None
for d in self.orchestrator.enabled_drivers['oob']:
if d.oob_type_support(oob_type):
oob_driver = d
break
if oob_driver is None:
self.logger.warning(
"Node OOB type %s has no enabled driver." % oob_type)
self.task.failure()
for n in oob_nodes:
self.task.add_status_msg(
msg="Node %s OOB type %s is not supported." %
(n.get_name(), oob_type),
error=True,
ctx=n.get_name(),
ctx_type='node')
continue
nf = self.orchestrator.create_nodefilter_from_nodelist(oob_nodes)
oob_driver_task = self.orchestrator.create_task(
design_ref=self.task.design_ref,
action=hd_fields.OrchestratorAction.InterrogateOob,
node_filter=nf)
self.task.register_subtask(oob_driver_task)
self.logger.info(
"Starting task %s for node verification via OOB type %s" %
(oob_driver_task.get_id(), oob_type))
task_futures.update(
self._parallelize_subtasks(oob_driver.execute_task,
[oob_driver_task.get_id()]))
try:
self._collect_subtask_futures(
task_futures,
timeout=(config.config_mgr.conf.timeouts.drydock_timeout * 60))
self.logger.debug(
"Collected subtasks for task %s" % str(self.task.get_id()))
except errors.CollectSubtaskTimeout as ex:
self.logger.warning(str(ex))
self.task.set_status(hd_fields.TaskStatus.Complete)
return
class PrepareNodes(BaseAction):
"""Action to prepare a node for deployment."""
def start(self):
"""Start executing this action."""
self.task.set_status(hd_fields.TaskStatus.Running)
self.task.save()
# NOTE(sh8121att) Should we attempt to interrogate the node via Node
# Driver to see if it is in a deployed state before we
# start rebooting? Or do we just leverage
# Drydock internal state via site build data (when implemented)?
node_driver = self.orchestrator.enabled_drivers['node']
if node_driver is None:
self.task.set_status(hd_fields.TaskStatus.Complete)
self.task.add_status_msg(
msg="No node driver enabled, ending task.",
error=True,
ctx=str(self.task.get_id()),
ctx_type='task')
self.task.result.set_message("No NodeDriver enabled.")
self.task.result.set_reason("Bad Configuration.")
self.task.failure()
self.task.save()
return
design_status, site_design = self.orchestrator.get_effective_site(
self.task.design_ref)
target_nodes = self.orchestrator.process_node_filter(
self.task.node_filter, site_design)
oob_type_partition = {}
for n in target_nodes:
if n.oob_type not in oob_type_partition.keys():
oob_type_partition[n.oob_type] = []
oob_type_partition[n.oob_type].append(n)
task_futures = dict()
oob_type_filters = dict()
for oob_type, oob_nodes in oob_type_partition.items():
oob_driver = None
for d in self.orchestrator.enabled_drivers['oob']:
if d.oob_type_support(oob_type):
oob_driver = d
break
if oob_driver is None:
self.logger.warning(
"Node OOB type %s has no enabled driver." % oob_type)
self.task.failure()
for n in oob_nodes:
self.task.add_status_msg(
msg="Node %s OOB type %s is not supported." %
(n.get_name(), oob_type),
error=True,
ctx=n.get_name(),
ctx_type='node')
continue
oob_type_filters[
oob_type] = self.orchestrator.create_nodefilter_from_nodelist(
oob_nodes)
setboot_task = self.orchestrator.create_task(
design_ref=self.task.design_ref,
action=hd_fields.OrchestratorAction.SetNodeBoot,
node_filter=oob_type_filters[oob_type])
self.task.register_subtask(setboot_task)
self.logger.info(
"Starting OOB driver task %s to set PXE boot for OOB type %s" %
(setboot_task.get_id(), oob_type))
task_futures.update(
self._parallelize_subtasks(oob_driver.execute_task,
[setboot_task.get_id()]))
try:
self._collect_subtask_futures(
task_futures,
timeout=(config.config_mgr.conf.timeouts.drydock_timeout * 60))
# Get successful nodes and add it to the node filter
# so the next step only happens for successfully configured nodes
self.task.bubble_results(
action_filter=hd_fields.OrchestratorAction.SetNodeBoot)
for t, f in oob_type_filters.items():
oob_type_filters[t]['filter_set'].append(
dict(
filter_type='union',
node_names=self.task.result.successes))
self.logger.debug(
"Collected subtasks for task %s" % str(self.task.get_id()))
except errors.CollectSubtaskTimeout as ex:
self.logger.warning(str(ex))
task_futures = dict()
for oob_type, oob_nodes in oob_type_partition.items():
oob_driver = None
for d in self.orchestrator.enabled_drivers['oob']:
if d.oob_type_support(oob_type):
oob_driver = d
break
if oob_driver is None:
self.logger.warning(
"Node OOB type %s has no enabled driver." % oob_type)
self.task.failure()
for n in oob_nodes:
self.task.add_status_msg(
msg="Node %s OOB type %s is not supported." %
(n.get_name(), oob_type),
error=True,
ctx=n.get_name(),
ctx_type='node')
continue
cycle_task = self.orchestrator.create_task(
design_ref=self.task.design_ref,
action=hd_fields.OrchestratorAction.PowerCycleNode,
node_filter=oob_type_filters[oob_type])
self.task.register_subtask(cycle_task)
self.logger.info(
"Starting OOB driver task %s to power cycle nodes for OOB type %s"
% (cycle_task.get_id(), oob_type))
task_futures.update(
self._parallelize_subtasks(oob_driver.execute_task,
[cycle_task.get_id()]))
try:
self._collect_subtask_futures(
task_futures,
timeout=(config.config_mgr.conf.timeouts.drydock_timeout * 60))
# Get successful nodes and add it to the node filter
# so the next step only happens for successfully configured nodes
self.task.bubble_results(
action_filter=hd_fields.OrchestratorAction.PowerCycleNode)
for t, f in oob_type_filters.items():
oob_type_filters[t]['filter_set'].append(
dict(
filter_type='union',
node_names=self.task.result.successes))
self.logger.debug(
"Collected subtasks for task %s" % str(self.task.get_id()))
except errors.CollectSubtaskTimeout as ex:
self.logger.warning(str(ex))
# IdentifyNode success will take some time after PowerCycleNode finishes
# Retry the operation a few times if it fails before considering it a final failure
# Each attempt is a new task which might make the final task tree a bit confusing
max_attempts = config.config_mgr.conf.timeouts.identify_node * (
60 / config.config_mgr.conf.poll_interval)
self.logger.debug(
"Will make max of %d attempts to complete the identify_node task."
% max_attempts)
nf = self.task.node_filter_from_successes()
node_identify_task = None
while True:
if node_identify_task is None:
node_identify_task = self.orchestrator.create_task(
design_ref=self.task.design_ref,
action=hd_fields.OrchestratorAction.IdentifyNode,
node_filter=nf)
self.task.register_subtask(node_identify_task)
self.logger.info(
"Starting node driver task %s to identify nodes." %
(node_identify_task.get_id()))
node_driver.execute_task(node_identify_task.get_id())
node_identify_task = self.state_manager.get_task(
node_identify_task.get_id())
node_identify_task.bubble_results()
try:
if not node_identify_task.retry_task(
max_attempts=max_attempts):
break
time.sleep(config.config_mgr.conf.poll_interval)
except errors.MaxRetriesReached:
self.task.failure()
break
# We can only commission nodes that were successfully identified in the provisioner
if len(node_identify_task.result.successes) > 0:
target_nf = node_identify_task.node_filter_from_successes()
self.logger.info(
"Found %s successfully identified nodes, starting commissioning."
% (len(node_identify_task.result.successes)))
node_commission_task = None
while True:
if node_commission_task is None:
node_commission_task = self.orchestrator.create_task(
design_ref=self.task.design_ref,
action=hd_fields.OrchestratorAction.ConfigureHardware,
node_filter=target_nf)
self.task.register_subtask(node_commission_task)
self.logger.info(
"Starting node driver task %s to commission nodes." %
(node_commission_task.get_id()))
node_driver.execute_task(node_commission_task.get_id())
node_commission_task = self.state_manager.get_task(
node_commission_task.get_id())
try:
if not node_commission_task.retry_task(max_attempts=3):
break
except errors.MaxRetriesReached:
self.task.failure()
break
else:
self.logger.warning(
"No nodes successfully identified, skipping commissioning subtask"
)
self.task.align_result()
self.task.set_status(hd_fields.TaskStatus.Complete)
self.task.save()
return
class DeployNodes(BaseAction):
"""Action to deploy a node with a persistent OS."""
def start(self):
"""Start executing this action."""
self.task.set_status(hd_fields.TaskStatus.Running)
self.task.save()
node_driver = self.orchestrator.enabled_drivers['node']
if node_driver is None:
self.task.set_status(hd_fields.TaskStatus.Complete)
self.task.add_status_msg(
msg="No node driver enabled, ending task.",
error=True,
ctx=str(self.task.get_id()),
ctx_type='task')
self.task.result.set_message("No NodeDriver enabled.")
self.task.result.set_reason("Bad Configuration.")
self.task.failure()
self.task.save()
return
node_networking_task = self.orchestrator.create_task(
design_ref=self.task.design_ref,
action=hd_fields.OrchestratorAction.ApplyNodeNetworking,
node_filter=self.task.node_filter)
self.task.register_subtask(node_networking_task)
self.logger.info(
"Starting node driver task %s to apply networking on nodes." %
(node_networking_task.get_id()))
node_driver.execute_task(node_networking_task.get_id())
node_networking_task = self.state_manager.get_task(
node_networking_task.get_id())
node_storage_task = None
if len(node_networking_task.result.successes) > 0:
self.logger.info(
"Found %s successfully networked nodes, configuring storage." %
(len(node_networking_task.result.successes)))
node_storage_task = self.orchestrator.create_task(
design_ref=self.task.design_ref,
action=hd_fields.OrchestratorAction.ApplyNodeStorage,
node_filter=node_networking_task.node_filter_from_successes())
self.logger.info(
"Starting node driver task %s to configure node storage." %
(node_storage_task.get_id()))
node_driver.execute_task(node_storage_task.get_id())
node_storage_task = self.state_manager.get_task(
node_storage_task.get_id())
else:
self.logger.warning(
"No nodes successfully networked, skipping storage configuration subtask."
)
node_platform_task = None
if (node_storage_task is not None
and len(node_storage_task.result.successes) > 0):
self.logger.info(
"Configured storage on %s nodes, configuring platform." %
(len(node_storage_task.result.successes)))
node_platform_task = self.orchestrator.create_task(
design_ref=self.task.design_ref,
action=hd_fields.OrchestratorAction.ApplyNodePlatform,
node_filter=node_storage_task.node_filter_from_successes())
self.task.register_subtask(node_platform_task)
self.logger.info(
"Starting node driver task %s to configure node platform." %
(node_platform_task.get_id()))
node_driver.execute_task(node_platform_task.get_id())
node_platform_task = self.state_manager.get_task(
node_platform_task.get_id())
else:
self.logger.warning(
"No nodes with storage configuration, skipping platform configuration subtask."
)
node_deploy_task = None
if node_platform_task is not None and len(
node_platform_task.result.successes) > 0:
self.logger.info(
"Configured platform on %s nodes, starting deployment." %
(len(node_platform_task.result.successes)))
while True:
if node_deploy_task is None:
node_deploy_task = self.orchestrator.create_task(
design_ref=self.task.design_ref,
action=hd_fields.OrchestratorAction.DeployNode,
node_filter=node_platform_task.
node_filter_from_successes())
self.logger.info(
"Starting node driver task %s to deploy nodes." %
(node_deploy_task.get_id()))
node_driver.execute_task(node_deploy_task.get_id())
node_deploy_task = self.state_manager.get_task(
node_deploy_task.get_id())
try:
if not node_deploy_task.retry_task(max_attempts=3):
break
except errors.MaxRetriesReached:
self.task.failure()
break
else:
self.logger.warning(
"Unable to configure platform on any nodes, skipping deploy subtask"
)
self.task.set_status(hd_fields.TaskStatus.Complete)
self.task.align_result()
self.task.save()
return

View File

@ -0,0 +1,496 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Workflow orchestrator for Drydock tasks."""
import time
import importlib
import logging
import uuid
import concurrent.futures
import drydock_provisioner.config as config
import drydock_provisioner.objects as objects
import drydock_provisioner.error as errors
import drydock_provisioner.objects.fields as hd_fields
from .actions.orchestrator import Noop
from .actions.orchestrator import ValidateDesign
from .actions.orchestrator import VerifySite
from .actions.orchestrator import PrepareSite
from .actions.orchestrator import VerifyNodes
from .actions.orchestrator import PrepareNodes
from .actions.orchestrator import DeployNodes
from .actions.orchestrator import DestroyNodes
class Orchestrator(object):
"""Defines functionality for task execution workflow."""
def __init__(self, enabled_drivers=None, state_manager=None,
ingester=None):
"""Initialize the orchestrator. A single instance should be executing at a time.
:param enabled_drivers: a dictionary of drivers to enable for executing downstream tasks
:param state_manager: the instance of statemgr.state.DrydockState to use for accessign app state
:param ingester: instance of ingester.Ingester used to process design documents
"""
self.orch_id = uuid.uuid4()
self.stop_flag = False
self.enabled_drivers = {}
self.state_manager = state_manager
self.ingester = ingester
if self.state_manager is None or self.ingester is None:
raise errors.OrchestratorError(
"Orchestrator requires instantiated state manager and ingester."
)
self.logger = logging.getLogger('drydock.orchestrator')
if enabled_drivers is not None:
oob_drivers = enabled_drivers.oob_driver
# This is because oslo_config changes the option value
# for multiopt depending on if multiple values are actually defined
for d in oob_drivers:
self.logger.info("Enabling OOB driver %s" % d)
if d is not None:
m, c = d.rsplit('.', 1)
oob_driver_class = \
getattr(importlib.import_module(m), c, None)
if oob_driver_class is not None:
if self.enabled_drivers.get('oob', None) is None:
self.enabled_drivers['oob'] = []
self.enabled_drivers['oob'].append(
oob_driver_class(
state_manager=state_manager,
orchestrator=self))
node_driver_name = enabled_drivers.node_driver
if node_driver_name is not None:
m, c = node_driver_name.rsplit('.', 1)
node_driver_class = \
getattr(importlib.import_module(m), c, None)
if node_driver_class is not None:
self.enabled_drivers['node'] = node_driver_class(
state_manager=state_manager, orchestrator=self)
network_driver_name = enabled_drivers.network_driver
if network_driver_name is not None:
m, c = network_driver_name.rsplit('.', 1)
network_driver_class = getattr(
importlib.import_module(m), c, None)
if network_driver_class is not None:
self.enabled_drivers['network'] = network_driver_class(
state_manager=state_manager, orchestrator=self)
def watch_for_tasks(self):
"""Start polling the database watching for Queued tasks to execute."""
orch_task_actions = {
hd_fields.OrchestratorAction.Noop: Noop,
hd_fields.OrchestratorAction.ValidateDesign: ValidateDesign,
hd_fields.OrchestratorAction.VerifySite: VerifySite,
hd_fields.OrchestratorAction.PrepareSite: PrepareSite,
hd_fields.OrchestratorAction.VerifyNodes: VerifyNodes,
hd_fields.OrchestratorAction.PrepareNodes: PrepareNodes,
hd_fields.OrchestratorAction.DeployNodes: DeployNodes,
hd_fields.OrchestratorAction.DestroyNodes: DestroyNodes,
}
# Loop trying to claim status as the active orchestrator
tp = concurrent.futures.ThreadPoolExecutor(max_workers=16)
while True:
if self.stop_flag:
tp.shutdown()
return
claim = self.state_manager.claim_leadership(self.orch_id)
if not claim:
self.logger.info(
"Orchestrator %s denied leadership, sleeping to try again."
% str(self.orch_id))
# TODO(sh8121att) Make this configurable
time.sleep(300)
else:
self.logger.info(
"Orchestrator %s successfully claimed leadership, polling for tasks."
% str(self.orch_id))
# As active orchestrator, loop looking for queued tasks.
task_future = None
while True:
# TODO(sh8121att) Need a timeout here
if self.stop_flag:
tp.shutdown()
self.state_manager.abdicate_leadership(self.orch_id)
return
if task_future is not None:
if task_future.done():
self.logger.debug(
"Task execution complete, looking for the next task."
)
exc = task_future.exception()
if exc is not None:
self.logger.error(
"Error in starting orchestrator action.",
exc_info=exc)
task_future = None
if task_future is None:
next_task = self.state_manager.get_next_queued_task(
allowed_actions=list(orch_task_actions.keys()))
if next_task is not None:
self.logger.info(
"Found task %s queued, starting execution." %
str(next_task.get_id()))
if next_task.check_terminate():
self.logger.info(
"Task %s marked for termination, skipping execution."
% str(next_task.get_id()))
next_task.set_status(
hd_fields.TaskStatus.Terminated)
next_task.save()
continue
action = orch_task_actions[next_task.action](
next_task, self, self.state_manager)
if action:
task_future = tp.submit(action.start)
else:
self.logger.warning(
"Task %s has unsupported action %s, ending execution."
% (str(next_task.get_id()),
next_task.action))
next_task.add_status_msg(
msg="Unsupported action %s." %
next_task.action,
error=True,
ctx=str(next_task.get_id()),
ctx_type='task')
next_task.failure()
next_task.set_status(
hd_fields.TaskStatus.Complete)
next_task.save()
else:
self.logger.info(
"No task found, waiting to poll again.")
# TODO(sh8121att) Make this configurable
time.sleep(config.config_mgr.conf.poll_interval)
claim = self.state_manager.maintain_leadership(
self.orch_id)
if not claim:
self.logger.info(
"Orchestrator %s lost leadership, attempting to reclaim."
% str(self.orch_id))
break
def stop_orchestrator(self):
"""Indicate this orchestrator instance should stop attempting to run."""
self.stop_flag = True
def terminate_task(self, task, propagate=True, terminated_by=None):
"""Mark a task for termination.
Optionally propagate the termination recursively to all subtasks
:param task: A objects.Task instance to terminate
:param propagate: whether the termination should propagatge to subtasks
"""
if task is None:
raise errors.OrchestratorError(
"Could find task %s" % str(task.get_id()))
else:
# Terminate initial task first to prevent add'l subtasks
self.logger.debug("Terminating task %s." % str(task.get_id()))
task.terminate_task(terminated_by=terminated_by)
if propagate:
# Get subtasks list
subtasks = task.get_subtasks()
for st_id in subtasks:
st = self.state_manager.get_task(st_id)
self.terminate_task(
st, propagate=True, terminated_by=terminated_by)
def create_task(self, **kwargs):
"""Create a new task and persist it."""
new_task = objects.Task(statemgr=self.state_manager, **kwargs)
self.state_manager.post_task(new_task)
return new_task
def compute_model_inheritance(self, site_design):
"""Compute inheritance of the design model.
Given a fully populated Site model, compute the effecitve
design by applying inheritance and references
"""
for n in getattr(site_design, 'baremetal_nodes', []):
n.compile_applied_model(site_design)
return
def get_described_site(self, design_ref):
"""Ingest design data referenced by design_ref.
Return a tuple of the processing status and the populated instance
of SiteDesign
:param design_ref: Supported URI referencing a design document
"""
status, site_design = self.ingester.ingest_data(
design_ref=design_ref, design_state=self.state_manager)
return status, site_design
def _validate_design(self, site_design, result_status=None):
"""Validate the design in site_design passes all validation rules.
Apply all validation rules to the design in site_design. If result_status is
defined, update it with validation messages. Otherwise a new status instance
will be created and returned.
:param site_design: instance of objects.SiteDesign
:param result_status: instance of objects.TaskStatus
"""
# TODO(sh8121att) actually implement the validation rules defined in the readme
if result_status is not None:
result_status = objects.TaskStatus()
result_status.set_status(hd_fields.ActionResult.Success)
return result_status
def get_effective_site(self, design_ref):
"""Ingest design data and compile the effective model of the design.
Return a tuple of the processing status and the populated instance
of SiteDesign after computing the inheritance chain
:param design_ref: Supported URI referencing a design document
"""
status = None
site_design = None
try:
status, site_design = self.get_described_site(design_ref)
if status.status == hd_fields.ActionResult.Success:
self.compute_model_inheritance(site_design)
status = self._validate_design(site_design, result_status=status)
except Exception as ex:
if status is not None:
status.add_status_msg(
"Error loading effective site: %s" % str(ex),
error=True,
ctx='NA',
ctx_type='NA')
status.set_status(hd_fields.ActionResult.Failure)
else:
self.logger.error(
"Error getting site definition: %s" % str(ex), exc_info=ex)
else:
status.add_status_msg(
msg="Successfully computed effective design.",
error=False,
ctx_type='NA',
ctx='NA')
status.set_status(hd_fields.ActionResult.Success)
return status, site_design
def get_target_nodes(self, task, failures=False, successes=False):
"""Compute list of target nodes for given ``task``.
If failures is true, then create a node_filter based on task result
failures. If successes is true, then create a node_filter based on
task result successes. If both are true, raise an exception. If neither
are true, build the list from the task node_filter.
:param task: instance of objects.Task
:param failures: whether to build target list from previous task failures
:param successes: whether to build target list from previous task successes
"""
design_status, site_design = self.get_effective_site(task.design_ref)
if design_status.status != hd_fields.ActionResult.Success:
raise errors.OrchestratorError(
"Unable to render effective site design.")
if failures and successes:
raise errors.OrchestratorError(
"Cannot specify both failures and successes.")
if failures:
if len(task.result.failures) == 0:
return []
nf = task.node_filter_from_failures()
elif successes:
if len(task.result.successes) == 0:
return []
nf = task.node_filter_from_sucessess()
else:
nf = task.node_filter
node_list = self.process_node_filter(nf, site_design)
return node_list
def create_nodefilter_from_nodelist(self, node_list):
"""Create a node filter to match list of nodes.
Returns a dictionary that will be properly processed by the orchestrator
:param node_list: List of objects.BaremetalNode instances the filter should match
"""
nf = dict()
nf['filter_set_type'] = 'intersection'
nf['filter_set'] = [
dict(
node_names=[x.get_id() for x in node_list],
filter_type='union')
]
return nf
def process_node_filter(self, node_filter, site_design):
target_nodes = site_design.baremetal_nodes
if node_filter is None:
return target_nodes
if not isinstance(node_filter, dict):
msg = "Invalid node_filter, must be a dictionary with keys 'filter_set_type' and 'filter_set'."
self.logger.error(msg)
raise errors.OrchestratorError(msg)
result_sets = []
for f in node_filter.get('filter_set', []):
result_sets.append(self.process_filter(target_nodes, f))
return self.join_filter_sets(
node_filter.get('filter_set_type'), result_sets)
def join_filter_sets(self, filter_set_type, result_sets):
if filter_set_type == 'union':
return self.list_union(*result_sets)
elif filter_set_type == 'intersection':
return self.list_intersection(*result_sets)
else:
raise errors.OrchestratorError(
"Unknow filter set type %s" % filter_set_type)
def process_filter(self, node_set, filter_set):
"""Take a filter and apply it to the node_set.
:param node_set: A full set of objects.BaremetalNode
:param filter_set: A filter set describing filters to apply to the node set
"""
try:
set_type = filter_set.get('filter_type', None)
node_names = filter_set.get('node_names', [])
node_tags = filter_set.get('node_tags', [])
node_labels = filter_set.get('node_labels', {})
rack_names = filter_set.get('rack_names', [])
rack_labels = filter_set.get('rack_labels', {})
target_nodes = dict()
if len(node_names) > 0:
self.logger.debug("Filtering nodes based on node names.")
target_nodes['node_names'] = [
x for x in node_set if x.get_name() in node_names
]
if len(node_tags) > 0:
self.logger.debug("Filtering nodes based on node tags.")
target_nodes['node_tags'] = [
x for x in node_set for t in node_tags if x.has_tag(t)
]
if len(rack_names) > 0:
self.logger.debug("Filtering nodes based on rack names.")
target_nodes['rack_names'] = [
x for x in node_set if x.get_rack() in rack_names
]
if len(node_labels) > 0:
self.logger.debug("Filtering nodes based on node labels.")
target_nodes['node_labels'] = []
for k, v in node_labels.items():
target_nodes['node_labels'].extend([
x for x in node_set
if getattr(x, 'owner_data', {}).get(k, None) == v
])
if len(rack_labels) > 0:
self.logger.info(
"Rack label filtering not yet implemented, returning all nodes."
)
target_nodes['rack_labels'] = node_set
if set_type == 'union':
result_set = self.list_union(
target_nodes.get('node_names', []),
target_nodes.get('node_tags', []),
target_nodes.get('rack_names', []),
target_nodes.get('node_labels', []))
elif set_type == 'intersection':
result_set = self.list_intersection(
target_nodes.get('node_names', []),
target_nodes.get('node_tags', []),
target_nodes.get('rack_names', []),
target_nodes.get('node_labels', []))
return result_set
except Exception as ex:
raise errors.OrchestratorError(
"Error processing node filter: %s" % str(ex))
def list_intersection(self, a, *rest):
"""Take the intersection of a with the intersection of all the rest.
:param a: list of values
:params rest: 0 or more lists of values
"""
if len(rest) > 1:
return list(
set(a).intersection(
set(Orchestrator.list_intersection(rest[0], rest[1:]))))
elif len(rest) == 1:
return list(set(a).intersection(set(rest[0])))
else:
return a
def list_union(self, *lists):
"""Return a unique-ified union of all the lists.
:param lists: indefinite number of lists
"""
results = set()
if len(lists) > 1:
for l in lists:
results = results.union(set(l))
return list(results)
elif len(lists) == 1:
return list(set(lists[0]))
else:
return None

View File

@ -39,32 +39,26 @@ class DrydockPolicy(object):
# Orchestrator Policy
task_rules = [
policy.DocumentedRuleDefault('physical_provisioner:read_task',
'role:admin', 'Get task status', [{
'path':
'/api/v1.0/tasks',
'method':
'GET'
'role:admin', 'Get task status',
[{
'path': '/api/v1.0/tasks',
'method': 'GET'
}, {
'path':
'/api/v1.0/tasks/{task_id}',
'method':
'GET'
'path': '/api/v1.0/tasks/{task_id}',
'method': 'GET'
}]),
policy.DocumentedRuleDefault('physical_provisioner:create_task',
'role:admin', 'Create a task', [{
'path':
'/api/v1.0/tasks',
'method':
'POST'
}]),
policy.DocumentedRuleDefault('physical_provisioner:validate_design',
'role:admin',
'Create validate_design task', [{
'path':
'/api/v1.0/tasks',
'method':
'POST'
'role:admin', 'Create a task',
[{
'path': '/api/v1.0/tasks',
'method': 'POST'
}]),
policy.DocumentedRuleDefault(
'physical_provisioner:validate_design', 'role:admin',
'Create validate_design task', [{
'path': '/api/v1.0/tasks',
'method': 'POST'
}]),
policy.DocumentedRuleDefault('physical_provisioner:verify_site',
'role:admin', 'Create verify_site task',
[{
@ -77,26 +71,26 @@ class DrydockPolicy(object):
'path': '/api/v1.0/tasks',
'method': 'POST'
}]),
policy.DocumentedRuleDefault('physical_provisioner:verify_node',
'role:admin', 'Create verify_node task',
policy.DocumentedRuleDefault('physical_provisioner:verify_nodes',
'role:admin', 'Create verify_nodes task',
[{
'path': '/api/v1.0/tasks',
'method': 'POST'
}]),
policy.DocumentedRuleDefault('physical_provisioner:prepare_node',
'role:admin', 'Create prepare_node task',
policy.DocumentedRuleDefault('physical_provisioner:prepare_nodes',
'role:admin', 'Create prepare_nodes task',
[{
'path': '/api/v1.0/tasks',
'method': 'POST'
}]),
policy.DocumentedRuleDefault('physical_provisioner:deploy_node',
'role:admin', 'Create deploy_node task',
policy.DocumentedRuleDefault('physical_provisioner:deploy_nodes',
'role:admin', 'Create deploy_nodes task',
[{
'path': '/api/v1.0/tasks',
'method': 'POST'
}]),
policy.DocumentedRuleDefault('physical_provisioner:destroy_node',
'role:admin', 'Create destroy_node task',
policy.DocumentedRuleDefault('physical_provisioner:destroy_nodes',
'role:admin', 'Create destroy_nodes task',
[{
'path': '/api/v1.0/tasks',
'method': 'POST'
@ -105,31 +99,26 @@ class DrydockPolicy(object):
# Data Management Policy
data_rules = [
policy.DocumentedRuleDefault('physical_provisioner:read_data',
'role:admin',
'Read loaded design data', [{
'path':
'/api/v1.0/designs',
'method':
'GET'
}, {
'path':
'/api/v1.0/designs/{design_id}',
'method':
'GET'
}]),
policy.DocumentedRuleDefault('physical_provisioner:ingest_data',
'role:admin', 'Load design data', [{
'path':
'/api/v1.0/designs',
'method':
'POST'
}, {
'path':
'/api/v1.0/designs/{design_id}/parts',
'method':
'POST'
}])
policy.DocumentedRuleDefault(
'physical_provisioner:read_data', 'role:admin',
'Read loaded design data',
[{
'path': '/api/v1.0/designs',
'method': 'GET'
}, {
'path': '/api/v1.0/designs/{design_id}',
'method': 'GET'
}]),
policy.DocumentedRuleDefault(
'physical_provisioner:ingest_data', 'role:admin',
'Load design data',
[{
'path': '/api/v1.0/designs',
'method': 'POST'
}, {
'path': '/api/v1.0/designs/{design_id}/parts',
'method': 'POST'
}])
]
def __init__(self):

View File

View File

@ -1,6 +1,6 @@
"""Definitions for Drydock database tables."""
from sqlalchemy.schema import Table, Column, Sequence
from sqlalchemy.schema import Table, Column
from sqlalchemy.types import Boolean, DateTime, String, Integer
from sqlalchemy.dialects import postgresql as pg
@ -25,6 +25,9 @@ class Tasks(ExtendTable):
Column('result_message', String(128)),
Column('result_reason', String(128)),
Column('result_error_count', Integer),
Column('result_successes', pg.ARRAY(String(32))),
Column('result_failures', pg.ARRAY(String(32))),
Column('retry', Integer),
Column('status', String(32)),
Column('created', DateTime),
Column('created_by', String(16)),
@ -33,7 +36,8 @@ class Tasks(ExtendTable):
Column('request_context', pg.JSON),
Column('action', String(32)),
Column('terminated', DateTime),
Column('terminated_by', String(16))
Column('terminated_by', String(16)),
Column('terminate', Boolean, default=False)
]
@ -47,7 +51,7 @@ class ResultMessage(ExtendTable):
Column('task_id', pg.BYTEA(16)),
Column('message', String(128)),
Column('error', Boolean),
Column('context', String(32)),
Column('context', String(64)),
Column('context_type', String(16)),
Column('ts', DateTime),
Column('extra', pg.JSON)

View File

@ -0,0 +1,14 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for resolving design references."""

View File

@ -0,0 +1,97 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for resolving design references."""
import urllib.parse
import requests
from drydock_provisioner import error as errors
class DesignResolver(object):
"""Class for handling different design references to resolve them to a design document."""
def __init__(self):
self.scheme_handlers = {
'http': self.resolve_reference_http,
'file': self.resolve_reference_file,
'https': self.resolve_reference_http,
'deckhand+http': self.resolve_reference_deckhand,
}
def resolve_reference(self, design_ref):
"""Resolve a reference to a design document.
Locate a schema handler based on the URI scheme of the design reference
and use that handler to get the design document referenced.
:param design_ref: A URI-formatted reference to a design document
"""
try:
design_uri = urllib.parse.urlparse(design_ref)
handler = self.scheme_handlers.get(design_uri.scheme, None)
if handler is None:
raise errors.InvalidDesignReference(
"Invalid reference scheme %s: no handler." %
design_uri.scheme)
else:
return handler(design_uri)
except ValueError:
raise errors.InvalidDesignReference(
"Cannot resolve design reference %s: unable to parse as valid URI."
% design_ref)
def resolve_reference_http(self, design_uri):
"""Retrieve design documents from http/https endpoints.
Return a byte array of the design document. Support unsecured or
basic auth
:param design_uri: Tuple as returned by urllib.parse for the design reference
"""
if design_uri.username is not None and design_uri.password is not None:
response = requests.get(
design_uri.geturl(),
auth=(design_uri.username, design_uri.password),
timeout=30)
else:
response = requests.get(design_uri.geturl(), timeout=30)
return response.content
def resolve_reference_file(self, design_uri):
"""Retrieve design documents from local file endpoints.
Return a byte array of the design document.
:param design_uri: Tuple as returned by urllib.parse for the design reference
"""
if design_uri.path != '':
f = open(design_uri.path, 'rb')
doc = f.read()
return doc
def resolve_reference_deckhand(self, design_uri):
"""Retrieve design documents from Deckhand endpoints.
Return a byte array of the design document. Assumes Keystone
authentication required.
:param design_uri: Tuple as returned by urllib.parse for the design reference
"""
raise errors.InvalidDesignReference(
"Deckhand references not currently supported.")

View File

@ -14,18 +14,20 @@
"""Access methods for managing external data access and persistence."""
import logging
from datetime import datetime
from sqlalchemy import create_engine
from sqlalchemy import MetaData
from sqlalchemy import sql
from sqlalchemy import MetaData
import drydock_provisioner.objects as objects
import drydock_provisioner.objects.fields as hd_fields
from .db import tables
from .design import resolver
from drydock_provisioner import config
from drydock_provisioner.error import DesignError
from drydock_provisioner.error import StateError
@ -34,110 +36,42 @@ class DrydockState(object):
self.logger = logging.getLogger(
config.config_mgr.conf.logging.global_logger_name)
self.resolver = resolver.DesignResolver()
return
def connect_db(self):
"""Connect the state manager to the persistent DB."""
self.db_engine = create_engine(
config.config_mgr.conf.database.database_connect_string)
self.db_metadata = MetaData()
self.db_metadata = MetaData(bind=self.db_engine)
self.tasks_tbl = tables.Tasks(self.db_metadata)
self.result_message_tbl = tables.ResultMessage(self.db_metadata)
self.active_instance_tbl = tables.ActiveInstance(self.db_metadata)
self.build_data_tbl = tables.BuildData(self.db_metadata)
return
# TODO(sh8121att) Need to lock a design base or change once implementation
# has started
def get_design(self, design_id):
if design_id not in self.designs.keys():
def tabularasa(self):
"""Truncate all tables.
raise DesignError("Design ID %s not found" % (design_id))
Used for testing to truncate all tables so the database is clean.
"""
table_names = [
'tasks',
'result_message',
'active_instance',
'build_data',
]
return objects.SiteDesign.obj_from_primitive(self.designs[design_id])
conn = self.db_engine.connect()
for t in table_names:
query_text = sql.text(
"TRUNCATE TABLE %s" % t).execution_options(autocommit=True)
conn.execute(query_text)
conn.close()
def post_design(self, site_design):
if site_design is not None:
my_lock = self.designs_lock.acquire(blocking=True, timeout=10)
if my_lock:
design_id = site_design.id
if design_id not in self.designs.keys():
self.designs[design_id] = site_design.obj_to_primitive()
else:
self.designs_lock.release()
raise StateError("Design ID %s already exists" % design_id)
self.designs_lock.release()
return True
raise StateError("Could not acquire lock")
else:
raise DesignError("Design change must be a SiteDesign instance")
def put_design(self, site_design):
if site_design is not None:
my_lock = self.designs_lock.acquire(blocking=True, timeout=10)
if my_lock:
design_id = site_design.id
if design_id not in self.designs.keys():
self.designs_lock.release()
raise StateError("Design ID %s does not exist" % design_id)
else:
self.designs[design_id] = site_design.obj_to_primitive()
self.designs_lock.release()
return True
raise StateError("Could not acquire lock")
else:
raise DesignError("Design base must be a SiteDesign instance")
def get_current_build(self):
latest_stamp = 0
current_build = None
for b in self.builds:
if b.build_id > latest_stamp:
latest_stamp = b.build_id
current_build = b
return deepcopy(current_build)
def get_build(self, build_id):
for b in self.builds:
if b.build_id == build_id:
return b
return None
def post_build(self, site_build):
if site_build is not None and isinstance(site_build, SiteBuild):
my_lock = self.builds_lock.acquire(block=True, timeout=10)
if my_lock:
exists = [
b for b in self.builds if b.build_id == site_build.build_id
]
if len(exists) > 0:
self.builds_lock.release()
raise DesignError("Already a site build with ID %s" %
(str(site_build.build_id)))
self.builds.append(deepcopy(site_build))
self.builds_lock.release()
return True
raise StateError("Could not acquire lock")
else:
raise DesignError("Design change must be a SiteDesign instance")
def put_build(self, site_build):
if site_build is not None and isinstance(site_build, SiteBuild):
my_lock = self.builds_lock.acquire(block=True, timeout=10)
if my_lock:
buildid = site_build.buildid
for b in self.builds:
if b.buildid == buildid:
b.merge_updates(site_build)
self.builds_lock.release()
return True
self.builds_lock.release()
return False
raise StateError("Could not acquire lock")
else:
raise DesignError("Design change must be a SiteDesign instance")
def get_design_documents(self, design_ref):
return self.resolver.resolve_reference(design_ref)
def get_tasks(self):
"""Get all tasks in the database."""
@ -150,6 +84,10 @@ class DrydockState(object):
self._assemble_tasks(task_list=task_list)
# add reference to this state manager to each task
for t in task_list:
t.statemgr = self
conn.close()
return task_list
@ -157,6 +95,107 @@ class DrydockState(object):
self.logger.error("Error querying task list: %s" % str(ex))
return []
def get_complete_subtasks(self, task_id):
"""Query database for subtasks of the provided task that are complete.
Complete is defined as status of Terminated or Complete.
:param task_id: uuid.UUID ID of the parent task for subtasks
"""
try:
conn = self.db_engine.connect()
query_text = sql.text(
"SELECT * FROM tasks WHERE " # nosec no strings are user-sourced
"parent_task_id = :parent_task_id AND "
"status IN ('" + hd_fields.TaskStatus.Terminated + "','" +
hd_fields.TaskStatus.Complete + "')")
rs = conn.execute(query_text, parent_task_id=task_id.bytes)
task_list = [objects.Task.from_db(dict(r)) for r in rs]
conn.close()
self._assemble_tasks(task_list=task_list)
for t in task_list:
t.statemgr = self
return task_list
except Exception as ex:
self.logger.error("Error querying complete subtask: %s" % str(ex))
return []
def get_active_subtasks(self, task_id):
"""Query database for subtasks of the provided task that are active.
Active is defined as status of not Terminated or Complete. Returns
list of objects.Task instances
:param task_id: uuid.UUID ID of the parent task for subtasks
"""
try:
conn = self.db_engine.connect()
query_text = sql.text(
"SELECT * FROM tasks WHERE " # nosec no strings are user-sourced
"parent_task_id = :parent_task_id AND "
"status NOT IN ['" + hd_fields.TaskStatus.Terminated + "','" +
hd_fields.TaskStatus.Complete + "']")
rs = conn.execute(query_text, parent_task_id=task_id.bytes)
task_list = [objects.Task.from_db(dict(r)) for r in rs]
conn.close()
self._assemble_tasks(task_list=task_list)
for t in task_list:
t.statemgr = self
return task_list
except Exception as ex:
self.logger.error("Error querying active subtask: %s" % str(ex))
return []
def get_next_queued_task(self, allowed_actions=None):
"""Query the database for the next (by creation timestamp) queued task.
If specified, only select tasks for one of the actions in the allowed_actions
list.
:param allowed_actions: list of string action names
"""
try:
conn = self.db_engine.connect()
if allowed_actions is None:
query = self.tasks_tbl.select().where(
self.tasks_tbl.c.status ==
hd_fields.TaskStatus.Queued).order_by(
self.tasks_tbl.c.created.asc())
rs = conn.execute(query)
else:
query = sql.text("SELECT * FROM tasks WHERE "
"status = :queued_status AND "
"action = ANY(:actions) "
"ORDER BY created ASC")
rs = conn.execute(
query,
queued_status=hd_fields.TaskStatus.Queued,
actions=allowed_actions)
r = rs.first()
conn.close()
if r is not None:
task = objects.Task.from_db(dict(r))
self._assemble_tasks(task_list=[task])
task.statemgr = self
return task
else:
return None
except Exception as ex:
self.logger.error(
"Error querying for next queued task: %s" % str(ex),
exc_info=True)
return None
def get_task(self, task_id):
"""Query database for task matching task_id.
@ -172,16 +211,19 @@ class DrydockState(object):
task = objects.Task.from_db(dict(r))
self.logger.debug("Assembling result messages for task %s." % str(task.task_id))
self.logger.debug(
"Assembling result messages for task %s." % str(task.task_id))
self._assemble_tasks(task_list=[task])
task.statemgr = self
conn.close()
return task
except Exception as ex:
self.logger.error("Error querying task %s: %s" % (str(task_id),
str(ex)), exc_info=True)
self.logger.error(
"Error querying task %s: %s" % (str(task_id), str(ex)),
exc_info=True)
return None
def post_result_message(self, task_id, msg):
@ -192,12 +234,15 @@ class DrydockState(object):
"""
try:
conn = self.db_engine.connect()
query = self.result_message_tbl.insert().values(task_id=task_id.bytes, **(msg.to_db()))
query = self.result_message_tbl.insert().values(
task_id=task_id.bytes, **(msg.to_db()))
conn.execute(query)
conn.close()
return True
except Exception as ex:
self.logger.error("Error inserting result message for task %s: %s" % (str(task_id), str(ex)))
self.logger.error(
"Error inserting result message for task %s: %s" %
(str(task_id), str(ex)))
return False
def _assemble_tasks(self, task_list=None):
@ -209,9 +254,10 @@ class DrydockState(object):
return None
conn = self.db_engine.connect()
query = sql.select([self.result_message_tbl]).where(
self.result_message_tbl.c.task_id == sql.bindparam(
'task_id')).order_by(self.result_message_tbl.c.sequence.asc())
query = sql.select([
self.result_message_tbl
]).where(self.result_message_tbl.c.task_id == sql.bindparam(
'task_id')).order_by(self.result_message_tbl.c.sequence.asc())
query.compile(self.db_engine)
for t in task_list:
@ -235,7 +281,8 @@ class DrydockState(object):
"""
try:
conn = self.db_engine.connect()
query = self.tasks_tbl.insert().values(**(task.to_db(include_id=True)))
query = self.tasks_tbl.insert().values(
**(task.to_db(include_id=True)))
conn.execute(query)
conn.close()
return True
@ -251,9 +298,9 @@ class DrydockState(object):
"""
try:
conn = self.db_engine.connect()
query = self.tasks_tbl.update(
**(task.to_db(include_id=False))).where(
self.tasks_tbl.c.task_id == task.task_id.bytes)
query = self.tasks_tbl.update().where(
self.tasks_tbl.c.task_id == task.task_id.bytes).values(
**(task.to_db(include_id=False)))
rs = conn.execute(query)
if rs.rowcount == 1:
conn.close()
@ -272,13 +319,17 @@ class DrydockState(object):
:param task_id: uuid.UUID parent task ID
:param subtask_id: uuid.UUID new subtask ID
"""
query_string = sql.text("UPDATE tasks "
"SET subtask_id_list = array_append(subtask_id_list, :new_subtask) "
"WHERE task_id = :task_id").execution_options(autocommit=True)
query_string = sql.text(
"UPDATE tasks "
"SET subtask_id_list = array_append(subtask_id_list, :new_subtask) "
"WHERE task_id = :task_id").execution_options(autocommit=True)
try:
conn = self.db_engine.connect()
rs = conn.execute(query_string, new_subtask=subtask_id.bytes, task_id=task_id.bytes)
rs = conn.execute(
query_string,
new_subtask=subtask_id.bytes,
task_id=task_id.bytes)
rc = rs.rowcount
conn.close()
if rc == 1:
@ -286,10 +337,31 @@ class DrydockState(object):
else:
return False
except Exception as ex:
self.logger.error("Error appending subtask %s to task %s: %s"
% (str(subtask_id), str(task_id), str(ex)))
self.logger.error("Error appending subtask %s to task %s: %s" %
(str(subtask_id), str(task_id), str(ex)))
return False
def maintain_leadership(self, leader_id):
"""The active leader reaffirms its existence.
:param leader_id: uuid.UUID ID of the leader
"""
try:
conn = self.db_engine.connect()
query = self.active_instance_tbl.update().where(
self.active_instance_tbl.c.identity == leader_id.bytes).values(
last_ping=datetime.utcnow())
rs = conn.execute(query)
rc = rs.rowcount
conn.close()
if rc == 1:
return True
else:
return False
except Exception as ex:
self.logger.error("Error maintaining leadership: %s" % str(ex))
def claim_leadership(self, leader_id):
"""Claim active instance status for leader_id.
@ -303,19 +375,24 @@ class DrydockState(object):
:param leader_id: a uuid.UUID instance identifying the instance to be considered active
"""
query_string = sql.text("INSERT INTO active_instance (dummy_key, identity, last_ping) "
"VALUES (1, :instance_id, timezone('UTC', now())) "
"ON CONFLICT (dummy_key) DO UPDATE SET "
"identity = :instance_id "
"WHERE active_instance.last_ping < (now() - interval '%d seconds')"
% (config.config_mgr.conf.default.leader_grace_period)).execution_options(autocommit=True)
query_string = sql.text( # nosec no strings are user-sourced
"INSERT INTO active_instance (dummy_key, identity, last_ping) "
"VALUES (1, :instance_id, timezone('UTC', now())) "
"ON CONFLICT (dummy_key) DO UPDATE SET "
"identity = :instance_id "
"WHERE active_instance.last_ping < (now() - interval '%d seconds')"
% (config.config_mgr.conf.leader_grace_period
)).execution_options(autocommit=True)
try:
conn = self.db_engine.connect()
rs = conn.execute(query_string, instance_id=leader_id.bytes)
rc = rs.rowcount
conn.execute(query_string, instance_id=leader_id.bytes)
check_query = self.active_instance_tbl.select().where(
self.active_instance_tbl.c.identity == leader_id.bytes)
rs = conn.execute(check_query)
r = rs.fetchone()
conn.close()
if rc == 1:
if r is not None:
return True
else:
return False
@ -323,6 +400,26 @@ class DrydockState(object):
self.logger.error("Error executing leadership claim: %s" % str(ex))
return False
def abdicate_leadership(self, leader_id):
"""Give up leadership for ``leader_id``.
:param leader_id: a uuid.UUID instance identifying the instance giving up leadership
"""
try:
conn = self.db_engine.connect()
query = self.active_instance_tbl.delete().where(
self.active_instance_tbl.c.identity == leader_id.bytes)
rs = conn.execute(query)
rc = rs.rowcount
conn.close()
if rc == 1:
return True
else:
return False
except Exception as ex:
self.logger.error("Error abidcating leadership: %s" % str(ex))
def post_promenade_part(self, part):
my_lock = self.promenade_lock.acquire(blocking=True, timeout=10)
if my_lock:

View File

@ -18,3 +18,4 @@ keystoneauth1==2.13.0
alembic==0.8.2
sqlalchemy==1.1.14
psycopg2==2.7.3.1
jsonschema==2.6.0

View File

@ -1,3 +1,4 @@
alembic==0.8.2
amqp==2.2.2
Babel==2.5.1
bson==0.4.7
@ -16,24 +17,26 @@ greenlet==0.4.12
idna==2.6
iso8601==0.1.11
Jinja2==2.9.6
jsonschema==2.6.0
keystoneauth1==2.13.0
keystonemiddleware==4.9.1
kombu==4.1.0
Mako==1.0.7
MarkupSafe==1.0
monotonic==1.3
msgpack-python==0.4.8
netaddr==0.7.19
netifaces==0.10.6
oauthlib==2.0.4
oauthlib==2.0.6
oslo.concurrency==3.23.0
oslo.config==3.16.0
oslo.context==2.19.1
oslo.context==2.19.2
oslo.i18n==3.18.0
oslo.log==3.31.0
oslo.messaging==5.33.0
oslo.log==3.32.0
oslo.messaging==5.33.1
oslo.middleware==3.32.1
oslo.policy==1.22.1
oslo.serialization==2.21.1
oslo.serialization==2.21.2
oslo.service==1.26.0
oslo.utils==3.30.0
oslo.versionedobjects==1.23.0
@ -45,6 +48,7 @@ pika-pool==0.1.3
pip==9.0.1
positional==1.2.1
prettytable==0.7.2
psycopg2==2.7.3.1
PTable==0.9.2
pycadf==2.6.0
pycrypto==2.6.1
@ -52,6 +56,7 @@ pyghmi==1.0.18
pyinotify==0.9.6
pyparsing==2.2.0
python-dateutil==2.6.1
python-editor==1.0.3
python-keystoneclient==3.13.0
python-mimeparse==1.6.0
pytz==2017.2
@ -62,9 +67,10 @@ rfc3986==1.1.0
Routes==2.4.1
setuptools==36.6.0
six==1.11.0
SQLAlchemy==1.1.14
statsd==3.2.1
stevedore==1.27.1
tenacity==4.4.0
tenacity==4.5.0
urllib3==1.22
uWSGI==2.0.15
vine==1.1.4

View File

@ -16,42 +16,19 @@
# and monitor the provisioning of those hosts and execution of bootstrap
# scripts
from setuptools import setup
from setuptools import setup, find_packages
setup(
name='drydock_provisioner',
version='0.1a1',
description='Bootstrapper for Kubernetes infrastructure',
url='http://github.com/att-comdev/drydock',
author='Scott Hussey - AT&T',
author_email='sh8121@att.com',
author='AT&T - AIC UCP Developers',
license='Apache 2.0',
packages=[
'drydock_provisioner',
'drydock_provisioner.objects',
'drydock_provisioner.ingester',
'drydock_provisioner.ingester.plugins',
'drydock_provisioner.statemgmt',
'drydock_provisioner.orchestrator',
'drydock_provisioner.control',
'drydock_provisioner.drivers',
'drydock_provisioner.drivers.oob',
'drydock_provisioner.drivers.oob.pyghmi_driver',
'drydock_provisioner.drivers.oob.manual_driver',
'drydock_provisioner.drivers.node',
'drydock_provisioner.drivers.node.maasdriver',
'drydock_provisioner.drivers.node.maasdriver.models',
'drydock_provisioner.control',
'drydock_provisioner.cli',
'drydock_provisioner.cli.design',
'drydock_provisioner.cli.part',
'drydock_provisioner.cli.task',
'drydock_provisioner.drydock_client',
'drydock_provisioner.statemgmt.db',
'drydock_provisioner.cli.node',
],
packages=find_packages(),
package_data={
'': ['schemas/*.yaml'],
},
entry_points={
'oslo.config.opts':
'drydock_provisioner = drydock_provisioner.config:list_opts',

View File

@ -0,0 +1,68 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Shared fixtures used by integration tests."""
import logging
from oslo_config import cfg
import drydock_provisioner.config as config
import drydock_provisioner.objects as objects
from drydock_provisioner.statemgmt.state import DrydockState
import pytest
@pytest.fixture()
def blank_state(drydock_state):
drydock_state.tabularasa()
return drydock_state
@pytest.fixture(scope='session')
def drydock_state(setup):
state_mgr = DrydockState()
state_mgr.connect_db()
return state_mgr
@pytest.fixture(scope='session')
def setup():
objects.register_all()
logging.basicConfig(level='DEBUG')
req_opts = {
'default':
[cfg.IntOpt('leader_grace_period'),
cfg.IntOpt('poll_interval')],
'database': [cfg.StrOpt('database_connect_string')],
'logging': [
cfg.StrOpt('global_logger_name', default='drydock'),
]
}
for k, v in req_opts.items():
config.config_mgr.conf.register_opts(v, group=k)
config.config_mgr.conf([])
config.config_mgr.conf.set_override(
name="database_connect_string",
group="database",
override="postgresql+psycopg2://drydock:drydock@localhost:5432/drydock"
)
config.config_mgr.conf.set_override(
name="leader_grace_period", group="default", override=15)
config.config_mgr.conf.set_override(
name="poll_interval", group="default", override=3)
return

View File

@ -11,32 +11,28 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test tasks API with Postgres backend."""
import uuid
import logging
import json
from drydock_provisioner import policy
from drydock_provisioner.orchestrator import Orchestrator
from drydock_provisioner.orchestrator.orchestrator import Orchestrator
from drydock_provisioner.control.base import DrydockRequestContext, BaseResource
from drydock_provisioner.control.tasks import TaskResource, TasksResource
from drydock_provisioner.control.base import DrydockRequestContext
from drydock_provisioner.control.tasks import TasksResource
import pytest
import falcon
logging.basicConfig(level=logging.DEBUG)
class TestTasksApi():
def test_read_tasks(self, mocker):
def test_read_tasks(self, mocker, blank_state):
''' DrydockPolicy.authorized() should correctly use oslo_policy to enforce
RBAC policy based on a DrydockRequestContext instance
'''
mocker.patch('oslo_policy.policy.Enforcer')
state = mocker.MagicMock()
ctx = DrydockRequestContext()
policy_engine = policy.DrydockPolicy()
@ -44,7 +40,7 @@ class TestTasksApi():
policy_mock_config = {'authorize.return_value': True}
policy_engine.enforcer.configre_mock(**policy_mock_config)
api = TasksResource(state_manager=state)
api = TasksResource(state_manager=blank_state)
# Configure context
project_id = str(uuid.uuid4().hex)
@ -61,33 +57,29 @@ class TestTasksApi():
api.on_get(req, resp)
expected_calls = [mocker.call.tasks]
assert state.has_calls(expected_calls)
assert resp.status == falcon.HTTP_200
def test_create_task(self, mocker):
def test_create_task(self, mocker, blank_state):
mocker.patch('oslo_policy.policy.Enforcer')
state = mocker.MagicMock()
ingester = mocker.MagicMock()
orch = mocker.MagicMock(
spec=Orchestrator, wraps=Orchestrator(state_manager=state))
orch_mock_config = {'execute_task.return_value': True}
orch.configure_mock(**orch_mock_config)
spec=Orchestrator,
wraps=Orchestrator(state_manager=blank_state, ingester=ingester))
ctx = DrydockRequestContext()
policy_engine = policy.DrydockPolicy()
json_body = json.dumps({
'action': 'verify_site',
'design_id': 'foo',
'design_ref': 'http://foo.com',
}).encode('utf-8')
# Mock policy enforcement
policy_mock_config = {'authorize.return_value': True}
policy_engine.enforcer.configure_mock(**policy_mock_config)
api = TasksResource(orchestrator=orch, state_manager=state)
api = TasksResource(orchestrator=orch, state_manager=blank_state)
# Configure context
project_id = str(uuid.uuid4().hex)

View File

@ -11,8 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import drydock_provisioner.config as config
import drydock_provisioner.drivers.node.maasdriver.api_client as client
@ -25,7 +23,9 @@ class TestClass(object):
client_config['api_key'])
resp = maas_client.get(
'account/', params={'op': 'list_authorisation_tokens'})
'account/', params={
'op': 'list_authorisation_tokens'
})
parsed = resp.json()

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import uuid
import drydock_provisioner.config as config

View File

@ -0,0 +1,75 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Generic testing for the orchestrator."""
import threading
import time
import drydock_provisioner.orchestrator.orchestrator as orch
import drydock_provisioner.objects.fields as hd_fields
from drydock_provisioner.ingester.ingester import Ingester
class TestClass(object):
def test_task_complete(self, setup, blank_state):
ingester = Ingester()
ingester.enable_plugin(
'drydock_provisioner.ingester.plugins.yaml.YamlIngester')
orchestrator = orch.Orchestrator(
state_manager=blank_state, ingester=ingester)
orch_task = orchestrator.create_task(
action=hd_fields.OrchestratorAction.Noop)
orch_task.set_status(hd_fields.TaskStatus.Queued)
orch_task.save()
orch_thread = threading.Thread(target=orchestrator.watch_for_tasks)
orch_thread.start()
try:
time.sleep(10)
orch_task = blank_state.get_task(orch_task.get_id())
assert orch_task.get_status() == hd_fields.TaskStatus.Complete
finally:
orchestrator.stop_orchestrator()
orch_thread.join(10)
def test_task_termination(self, setup, blank_state):
ingester = Ingester()
ingester.enable_plugin(
'drydock_provisioner.ingester.plugins.yaml.YamlIngester')
orchestrator = orch.Orchestrator(
state_manager=blank_state, ingester=ingester)
orch_task = orchestrator.create_task(
action=hd_fields.OrchestratorAction.Noop)
orch_task.set_status(hd_fields.TaskStatus.Queued)
orch_task.save()
orch_thread = threading.Thread(target=orchestrator.watch_for_tasks)
orch_thread.start()
try:
time.sleep(2)
orchestrator.terminate_task(orch_task)
time.sleep(10)
orch_task = blank_state.get_task(orch_task.get_id())
assert orch_task.get_status() == hd_fields.TaskStatus.Terminated
finally:
orchestrator.stop_orchestrator()
orch_thread.join(10)

View File

@ -1,105 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import pytest
import shutil
import os
import uuid
import drydock_provisioner.config as config
import drydock_provisioner.drivers.node.maasdriver.api_client as client
import drydock_provisioner.ingester.plugins.yaml
import drydock_provisioner.statemgmt as statemgmt
import drydock_provisioner.objects as objects
import drydock_provisioner.orchestrator as orch
import drydock_provisioner.objects.fields as hd_fields
import drydock_provisioner.objects.task as task
import drydock_provisioner.drivers as drivers
from drydock_provisioner.ingester import Ingester
class TestClass(object):
def test_client_verify(self):
design_state = statemgmt.DesignState()
orchestrator = orch.Orchestrator(
state_manager=design_state,
enabled_drivers={
'node':
'drydock_provisioner.drivers.node.maasdriver.driver.MaasNodeDriver'
})
orch_task = orchestrator.create_task(
task.OrchestratorTask,
site='sitename',
design_id=None,
action=hd_fields.OrchestratorAction.VerifySite)
orchestrator.execute_task(orch_task.get_id())
orch_task = design_state.get_task(orch_task.get_id())
assert orch_task.result == hd_fields.ActionResult.Success
def test_orch_preparesite(self, input_files):
objects.register_all()
input_file = input_files.join("fullsite.yaml")
design_state = statemgmt.DesignState()
design_data = objects.SiteDesign()
design_id = design_data.assign_id()
design_state.post_design(design_data)
ingester = Ingester()
ingester.enable_plugins(
[drydock_provisioner.ingester.plugins.yaml.YamlIngester])
ingester.ingest_data(
plugin_name='yaml',
design_state=design_state,
filenames=[str(input_file)],
design_id=design_id)
design_data = design_state.get_design(design_id)
orchestrator = orch.Orchestrator(
state_manager=design_state,
enabled_drivers={
'node':
'drydock_provisioner.drivers.node.maasdriver.driver.MaasNodeDriver'
})
orch_task = orchestrator.create_task(
task.OrchestratorTask,
site='sitename',
design_id=design_id,
action=hd_fields.OrchestratorAction.PrepareSite)
orchestrator.execute_task(orch_task.get_id())
orch_task = design_state.get_task(orch_task.get_id())
assert orch_task.result == hd_fields.ActionResult.Success
@pytest.fixture(scope='module')
def input_files(self, tmpdir_factory, request):
tmpdir = tmpdir_factory.mktemp('data')
samples_dir = os.path.dirname(str(request.fspath)) + "/../yaml_samples"
samples = os.listdir(samples_dir)
for f in samples:
src_file = samples_dir + "/" + f
dst_file = str(tmpdir) + "/" + f
shutil.copyfile(src_file, dst_file)
return tmpdir

View File

@ -1,26 +1,30 @@
import pytest
import logging
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test orchestrator leadership organization via Postgres."""
import uuid
import time
from oslo_config import cfg
import drydock_provisioner.objects as objects
import drydock_provisioner.config as config
from drydock_provisioner.statemgmt.state import DrydockState
class TestPostgres(object):
def test_claim_leadership(self, setup):
def test_claim_leadership(self, blank_state):
"""Test that a node can claim leadership.
First test claiming leadership with an empty table, simulating startup
Second test that an immediate follow-up claim is denied
Third test that a usurping claim after the grace period succeeds
"""
ds = DrydockState()
ds = blank_state
first_leader = uuid.uuid4()
second_leader = uuid.uuid4()
@ -28,12 +32,12 @@ class TestPostgres(object):
print("Claiming leadership for %s" % str(first_leader.bytes))
crown = ds.claim_leadership(first_leader)
assert crown == True
assert crown
print("Claiming leadership for %s" % str(second_leader.bytes))
crown = ds.claim_leadership(second_leader)
assert crown == False
assert crown is False
time.sleep(20)
@ -41,31 +45,4 @@ class TestPostgres(object):
"Claiming leadership for %s after 20s" % str(second_leader.bytes))
crown = ds.claim_leadership(second_leader)
assert crown == True
@pytest.fixture(scope='module')
def setup(self):
objects.register_all()
logging.basicConfig()
req_opts = {
'default': [cfg.IntOpt('leader_grace_period')],
'database': [cfg.StrOpt('database_connect_string')],
'logging': [
cfg.StrOpt('global_logger_name', default='drydock'),
]
}
for k, v in req_opts.items():
config.config_mgr.conf.register_opts(v, group=k)
config.config_mgr.conf([])
config.config_mgr.conf.set_override(
name="database_connect_string",
group="database",
override=
"postgresql+psycopg2://drydock:drydock@localhost:5432/drydock")
config.config_mgr.conf.set_override(
name="leader_grace_period", group="default", override=15)
return
assert crown

View File

@ -1,114 +1,31 @@
import pytest
import logging
import uuid
import time
from oslo_config import cfg
from sqlalchemy import sql
from sqlalchemy import create_engine
from drydock_provisioner import objects
import drydock_provisioner.config as config
from drydock_provisioner.control.base import DrydockRequestContext
from drydock_provisioner.statemgmt.state import DrydockState
class TestPostgres(object):
def test_result_message_insert(self, populateddb, drydockstate):
def test_result_message_insert(self, populateddb, drydock_state):
"""Test that a result message for a task can be added."""
msg1 = objects.TaskStatusMessage('Error 1', True, 'node', 'node1')
msg2 = objects.TaskStatusMessage('Status 1', False, 'node', 'node1')
result = drydockstate.post_result_message(populateddb.task_id, msg1)
result = drydock_state.post_result_message(populateddb.task_id, msg1)
assert result
result = drydockstate.post_result_message(populateddb.task_id, msg2)
result = drydock_state.post_result_message(populateddb.task_id, msg2)
assert result
task = drydockstate.get_task(populateddb.task_id)
task = drydock_state.get_task(populateddb.task_id)
assert task.result.error_count == 1
assert len(task.result.message_list) == 2
@pytest.fixture(scope='function')
def populateddb(self, cleandb):
def populateddb(self, blank_state):
"""Add dummy task to test against."""
task = objects.Task(
action='prepare_site', design_ref='http://test.com/design')
q1 = sql.text('INSERT INTO tasks ' \
'(task_id, created, action, design_ref) ' \
'VALUES (:task_id, :created, :action, :design_ref)').execution_options(autocommit=True)
engine = create_engine(
config.config_mgr.conf.database.database_connect_string)
conn = engine.connect()
conn.execute(
q1,
task_id=task.task_id.bytes,
created=task.created,
action=task.action,
design_ref=task.design_ref)
conn.close()
blank_state.post_task(task)
return task
@pytest.fixture(scope='session')
def drydockstate(self):
return DrydockState()
@pytest.fixture(scope='function')
def cleandb(self, setup):
q1 = sql.text('TRUNCATE TABLE tasks').execution_options(
autocommit=True)
q2 = sql.text('TRUNCATE TABLE result_message').execution_options(
autocommit=True)
q3 = sql.text('TRUNCATE TABLE active_instance').execution_options(
autocommit=True)
q4 = sql.text('TRUNCATE TABLE build_data').execution_options(
autocommit=True)
engine = create_engine(
config.config_mgr.conf.database.database_connect_string)
conn = engine.connect()
conn.execute(q1)
conn.execute(q2)
conn.execute(q3)
conn.execute(q4)
conn.close()
return
@pytest.fixture(scope='module')
def setup(self):
objects.register_all()
logging.basicConfig()
req_opts = {
'default': [cfg.IntOpt('leader_grace_period')],
'database': [cfg.StrOpt('database_connect_string')],
'logging': [
cfg.StrOpt('global_logger_name', default='drydock'),
]
}
for k, v in req_opts.items():
config.config_mgr.conf.register_opts(v, group=k)
config.config_mgr.conf([])
config.config_mgr.conf.set_override(
name="database_connect_string",
group="database",
override=
"postgresql+psycopg2://drydock:drydock@localhost:5432/drydock")
config.config_mgr.conf.set_override(
name="leader_grace_period", group="default", override=15)
return

View File

@ -1,23 +1,29 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test postgres integration for task management."""
import pytest
import logging
import uuid
import time
from oslo_config import cfg
from sqlalchemy import sql
from sqlalchemy import create_engine
from drydock_provisioner import objects
import drydock_provisioner.config as config
from drydock_provisioner.control.base import DrydockRequestContext
from drydock_provisioner.statemgmt.state import DrydockState
class TestPostgres(object):
def test_task_insert(self, cleandb, drydockstate):
def test_task_insert(self, blank_state):
"""Test that a task can be inserted into the database."""
ctx = DrydockRequestContext()
ctx.user = 'sh8121'
@ -28,113 +34,48 @@ class TestPostgres(object):
design_ref='http://foo.bar/design',
context=ctx)
result = drydockstate.post_task(task)
result = blank_state.post_task(task)
assert result == True
assert result
def test_subtask_append(self, cleandb, drydockstate):
def test_subtask_append(self, blank_state):
"""Test that the atomic subtask append method works."""
task = objects.Task(action='deploy_node', design_ref='http://foobar/design')
subtask = objects.Task(action='deploy_node', design_ref='http://foobar/design', parent_task_id=task.task_id)
task = objects.Task(
action='deploy_node', design_ref='http://foobar/design')
subtask = objects.Task(
action='deploy_node',
design_ref='http://foobar/design',
parent_task_id=task.task_id)
drydockstate.post_task(task)
drydockstate.post_task(subtask)
drydockstate.add_subtask(task.task_id, subtask.task_id)
blank_state.post_task(task)
blank_state.post_task(subtask)
blank_state.add_subtask(task.task_id, subtask.task_id)
test_task = drydockstate.get_task(task.task_id)
test_task = blank_state.get_task(task.task_id)
assert subtask.task_id in test_task.subtask_id_list
def test_task_select(self, populateddb, drydockstate):
def test_task_select(self, populateddb, drydock_state):
"""Test that a task can be selected."""
result = drydockstate.get_task(populateddb.task_id)
result = drydock_state.get_task(populateddb.task_id)
assert result is not None
assert result.design_ref == populateddb.design_ref
def test_task_list(self, populateddb, drydockstate):
def test_task_list(self, populateddb, drydock_state):
"""Test getting a list of all tasks."""
result = drydockstate.get_tasks()
result = drydock_state.get_tasks()
assert len(result) == 1
@pytest.fixture(scope='function')
def populateddb(self, cleandb):
def populateddb(self, blank_state):
"""Add dummy task to test against."""
task = objects.Task(
action='prepare_site', design_ref='http://test.com/design')
q1 = sql.text('INSERT INTO tasks ' \
'(task_id, created, action, design_ref) ' \
'VALUES (:task_id, :created, :action, :design_ref)').execution_options(autocommit=True)
engine = create_engine(
config.config_mgr.conf.database.database_connect_string)
conn = engine.connect()
conn.execute(
q1,
task_id=task.task_id.bytes,
created=task.created,
action=task.action,
design_ref=task.design_ref)
conn.close()
blank_state.post_task(task)
return task
@pytest.fixture(scope='session')
def drydockstate(self):
return DrydockState()
@pytest.fixture(scope='function')
def cleandb(self, setup):
q1 = sql.text('TRUNCATE TABLE tasks').execution_options(
autocommit=True)
q2 = sql.text('TRUNCATE TABLE result_message').execution_options(
autocommit=True)
q3 = sql.text('TRUNCATE TABLE active_instance').execution_options(
autocommit=True)
q4 = sql.text('TRUNCATE TABLE build_data').execution_options(
autocommit=True)
engine = create_engine(
config.config_mgr.conf.database.database_connect_string)
conn = engine.connect()
conn.execute(q1)
conn.execute(q2)
conn.execute(q3)
conn.execute(q4)
conn.close()
return
@pytest.fixture(scope='module')
def setup(self):
objects.register_all()
logging.basicConfig()
req_opts = {
'default': [cfg.IntOpt('leader_grace_period')],
'database': [cfg.StrOpt('database_connect_string')],
'logging': [
cfg.StrOpt('global_logger_name', default='drydock'),
]
}
for k, v in req_opts.items():
config.config_mgr.conf.register_opts(v, group=k)
config.config_mgr.conf([])
config.config_mgr.conf.set_override(
name="database_connect_string",
group="database",
override=
"postgresql+psycopg2://drydock:drydock@localhost:5432/drydock")
config.config_mgr.conf.set_override(
name="leader_grace_period", group="default", override=15)
return

View File

@ -17,8 +17,6 @@ import logging
from drydock_provisioner import policy
from drydock_provisioner.control.base import DrydockRequestContext
import pytest
logging.basicConfig(level=logging.DEBUG)
@ -50,10 +48,10 @@ class TestEnforcerDecorator():
self.target_function(req, resp)
expected_calls = [
mocker.call.authorize('physical_provisioner:read_task', {
'project_id': project_id,
'user_id': user_id
}, ctx.to_policy_view())
mocker.call.authorize(
'physical_provisioner:read_task',
{'project_id': project_id,
'user_id': user_id}, ctx.to_policy_view())
]
policy_engine.enforcer.assert_has_calls(expected_calls)

View File

@ -18,8 +18,6 @@ import sys
from drydock_provisioner.control.base import DrydockRequest
from drydock_provisioner.control.middleware import AuthMiddleware
import pytest
class TestAuthMiddleware():
@ -96,7 +94,7 @@ class TestAuthMiddleware():
middleware.process_request(request, response)
assert request.context.authenticated == True
assert request.context.authenticated
assert request.context.user_id == user_id
def test_process_request_user_noauth(self):
@ -115,4 +113,4 @@ class TestAuthMiddleware():
middleware.process_request(request, response)
assert request.context.authenticated == False
assert request.context.authenticated is False

View File

@ -11,44 +11,40 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from copy import deepcopy
import pytest
import shutil
import os
import drydock_provisioner.ingester.plugins.yaml
import yaml
import logging
from drydock_provisioner.ingester import Ingester
from drydock_provisioner.statemgmt import DesignState
from drydock_provisioner.orchestrator import Orchestrator
from drydock_provisioner.objects.site import SiteDesign
from oslo_config import cfg
import drydock_provisioner.config as config
import drydock_provisioner.objects as objects
from drydock_provisioner.ingester.ingester import Ingester
from drydock_provisioner.statemgmt.state import DrydockState
from drydock_provisioner.orchestrator.orchestrator import Orchestrator
logging.basicConfig(level=logging.DEBUG)
class TestClass(object):
def test_design_inheritance(self, input_files):
def test_design_inheritance(self, input_files, setup):
input_file = input_files.join("fullsite.yaml")
design_state = DesignState()
design_data = SiteDesign()
design_id = design_data.assign_id()
design_state.post_design(design_data)
design_state = DrydockState()
design_ref = "file://%s" % str(input_file)
ingester = Ingester()
ingester.enable_plugins(
['drydock_provisioner.ingester.plugins.yaml.YamlIngester'])
ingester.ingest_data(
plugin_name='yaml',
design_state=design_state,
design_id=str(design_id),
filenames=[str(input_file)])
ingester.enable_plugin(
'drydock_provisioner.ingester.plugins.yaml.YamlIngester')
orchestrator = Orchestrator(state_manager=design_state)
orchestrator = Orchestrator(
state_manager=design_state, ingester=ingester)
design_data = orchestrator.get_effective_site(design_id)
design_status, design_data = orchestrator.get_effective_site(
design_ref)
assert len(design_data.baremetal_nodes) == 2
@ -67,8 +63,8 @@ class TestClass(object):
@pytest.fixture(scope='module')
def input_files(self, tmpdir_factory, request):
tmpdir = tmpdir_factory.mktemp('data')
samples_dir = os.path.dirname(
str(request.fspath)) + "/" + "../yaml_samples"
samples_dir = os.path.dirname(str(
request.fspath)) + "/" + "../yaml_samples"
samples = os.listdir(samples_dir)
for f in samples:
@ -77,3 +73,30 @@ class TestClass(object):
shutil.copyfile(src_file, dst_file)
return tmpdir
@pytest.fixture(scope='module')
def setup(self):
objects.register_all()
logging.basicConfig()
req_opts = {
'default': [cfg.IntOpt('leader_grace_period')],
'database': [cfg.StrOpt('database_connect_string')],
'logging': [
cfg.StrOpt('global_logger_name', default='drydock'),
]
}
for k, v in req_opts.items():
config.config_mgr.conf.register_opts(v, group=k)
config.config_mgr.conf([])
config.config_mgr.conf.set_override(
name="database_connect_string",
group="database",
override=
"postgresql+psycopg2://drydock:drydock@localhost:5432/drydock")
config.config_mgr.conf.set_override(
name="leader_grace_period", group="default", override=15)
return

View File

@ -20,7 +20,7 @@ import drydock_provisioner.drydock_client.client as dc_client
def test_blank_session_error():
with pytest.raises(Exception):
dd_ses = dc_session.DrydockSession()
dc_session.DrydockSession()
def test_session_init_minimal():
@ -52,7 +52,9 @@ def test_session_init_uuid_token():
def test_session_init_fernet_token():
host = 'foo.bar.baz'
token = 'gAAAAABU7roWGiCuOvgFcckec-0ytpGnMZDBLG9hA7Hr9qfvdZDHjsak39YN98HXxoYLIqVm19Egku5YR3wyI7heVrOmPNEtmr-fIM1rtahudEdEAPM4HCiMrBmiA1Lw6SU8jc2rPLC7FK7nBCia_BGhG17NVHuQu0S7waA306jyKNhHwUnpsBQ'
token = 'gAAAAABU7roWGiCuOvgFcckec-0ytpGnMZDBLG9hA7Hr9qfvdZDHjsak39YN98HXxoYLIqVm' \
'19Egku5YR3wyI7heVrOmPNEtmr-fIM1rtahudEdEAPM4HCiMrBmiA1Lw6SU8jc2rPLC7FK7n' \
'BCia_BGhG17NVHuQu0S7waA306jyKNhHwUnpsBQ'
dd_ses = dc_session.DrydockSession(host, token=token)
@ -90,49 +92,6 @@ def test_session_get():
assert req.headers.get('X-Context-Marker', None) == marker
@responses.activate
def test_client_designs_get():
design_id = '828e88dc-6a8b-11e7-97ae-080027ef795a'
responses.add(
responses.GET,
'http://foo.bar.baz/api/v1.0/designs',
json=[design_id],
status=200)
host = 'foo.bar.baz'
token = '5f1e08b6-38ec-4a99-9d0f-00d29c4e325b'
dd_ses = dc_session.DrydockSession(host, token=token)
dd_client = dc_client.DrydockClient(dd_ses)
design_list = dd_client.get_design_ids()
assert design_id in design_list
@responses.activate
def test_client_design_get():
design = {
'id': '828e88dc-6a8b-11e7-97ae-080027ef795a',
'model_type': 'SiteDesign'
}
responses.add(
responses.GET,
'http://foo.bar.baz/api/v1.0/designs/828e88dc-6a8b-11e7-97ae-080027ef795a',
json=design,
status=200)
host = 'foo.bar.baz'
dd_ses = dc_session.DrydockSession(host)
dd_client = dc_client.DrydockClient(dd_ses)
design_resp = dd_client.get_design('828e88dc-6a8b-11e7-97ae-080027ef795a')
assert design_resp['id'] == design['id']
assert design_resp['model_type'] == design['model_type']
@responses.activate
def test_client_task_get():
task = {

View File

@ -12,76 +12,42 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from drydock_provisioner.ingester import Ingester
from drydock_provisioner.statemgmt import DesignState
from drydock_provisioner.ingester.ingester import Ingester
from drydock_provisioner.statemgmt.state import DrydockState
import drydock_provisioner.objects as objects
import drydock_provisioner.config as config
from oslo_config import cfg
import logging
import pytest
import shutil
import os
import drydock_provisioner.ingester.plugins.yaml
class TestClass(object):
def test_ingest_full_site(self, input_files):
def test_ingest_full_site(self, input_files, setup):
objects.register_all()
input_file = input_files.join("fullsite.yaml")
design_state = DesignState()
design_data = objects.SiteDesign()
design_id = design_data.assign_id()
design_state.post_design(design_data)
design_state = DrydockState()
design_ref = "file://%s" % str(input_file)
ingester = Ingester()
ingester.enable_plugins(
['drydock_provisioner.ingester.plugins.yaml.YamlIngester'])
ingester.ingest_data(
plugin_name='yaml',
design_state=design_state,
filenames=[str(input_file)],
design_id=design_id)
design_data = design_state.get_design(design_id)
ingester.enable_plugin(
'drydock_provisioner.ingester.plugins.yaml.YamlIngester')
design_status, design_data = ingester.ingest_data(
design_state=design_state, design_ref=design_ref)
assert len(design_data.host_profiles) == 2
assert len(design_data.baremetal_nodes) == 2
def test_ingest_federated_design(self, input_files):
objects.register_all()
profiles_file = input_files.join("fullsite_profiles.yaml")
networks_file = input_files.join("fullsite_networks.yaml")
nodes_file = input_files.join("fullsite_nodes.yaml")
design_state = DesignState()
design_data = objects.SiteDesign()
design_id = design_data.assign_id()
design_state.post_design(design_data)
ingester = Ingester()
ingester.enable_plugins(
['drydock_provisioner.ingester.plugins.yaml.YamlIngester'])
ingester.ingest_data(
plugin_name='yaml',
design_state=design_state,
design_id=design_id,
filenames=[
str(profiles_file),
str(networks_file),
str(nodes_file)
])
design_data = design_state.get_design(design_id)
assert len(design_data.host_profiles) == 2
@pytest.fixture(scope='module')
def input_files(self, tmpdir_factory, request):
tmpdir = tmpdir_factory.mktemp('data')
samples_dir = os.path.dirname(
str(request.fspath)) + "/" + "../yaml_samples"
samples_dir = os.path.dirname(str(
request.fspath)) + "/" + "../yaml_samples"
samples = os.listdir(samples_dir)
for f in samples:
@ -90,3 +56,30 @@ class TestClass(object):
shutil.copyfile(src_file, dst_file)
return tmpdir
@pytest.fixture(scope='module')
def setup(self):
objects.register_all()
logging.basicConfig()
req_opts = {
'default': [cfg.IntOpt('leader_grace_period')],
'database': [cfg.StrOpt('database_connect_string')],
'logging': [
cfg.StrOpt('global_logger_name', default='drydock'),
]
}
for k, v in req_opts.items():
config.config_mgr.conf.register_opts(v, group=k)
config.config_mgr.conf([])
config.config_mgr.conf.set_override(
name="database_connect_string",
group="database",
override=
"postgresql+psycopg2://drydock:drydock@localhost:5432/drydock")
config.config_mgr.conf.set_override(
name="leader_grace_period", group="default", override=15)
return

View File

@ -13,73 +13,61 @@
# limitations under the License.
"""Test that rack models are properly parsed."""
from drydock_provisioner.ingester import Ingester
from drydock_provisioner.statemgmt import DesignState
from drydock_provisioner.ingester.ingester import Ingester
from drydock_provisioner.statemgmt.state import DrydockState
import drydock_provisioner.objects as objects
import drydock_provisioner.config as config
import drydock_provisioner.error as errors
from oslo_config import cfg
import logging
import pytest
import shutil
import os
import drydock_provisioner.ingester.plugins.yaml
class TestClass(object):
def test_rack_parse(self, input_files):
def test_rack_parse(self, input_files, setup):
objects.register_all()
input_file = input_files.join("fullsite.yaml")
design_state = DesignState()
design_data = objects.SiteDesign()
design_id = design_data.assign_id()
design_state.post_design(design_data)
design_state = DrydockState()
design_ref = "file://%s" % str(input_file)
ingester = Ingester()
ingester.enable_plugins(
['drydock_provisioner.ingester.plugins.yaml.YamlIngester'])
ingester.ingest_data(
plugin_name='yaml',
design_state=design_state,
filenames=[str(input_file)],
design_id=design_id)
design_data = design_state.get_design(design_id)
ingester.enable_plugin(
'drydock_provisioner.ingester.plugins.yaml.YamlIngester')
design_status, design_data = ingester.ingest_data(
design_state=design_state, design_ref=design_ref)
rack = design_data.get_rack('rack1')
assert rack.location.get('grid') == 'EG12'
def test_rack_not_found(self, input_files):
def test_rack_not_found(self, input_files, setup):
objects.register_all()
input_file = input_files.join("fullsite.yaml")
design_state = DesignState()
design_data = objects.SiteDesign()
design_id = design_data.assign_id()
design_state.post_design(design_data)
design_state = DrydockState()
design_ref = "file://%s" % str(input_file)
ingester = Ingester()
ingester.enable_plugins(
['drydock_provisioner.ingester.plugins.yaml.YamlIngester'])
ingester.ingest_data(
plugin_name='yaml',
design_state=design_state,
filenames=[str(input_file)],
design_id=design_id)
design_data = design_state.get_design(design_id)
ingester.enable_plugin(
'drydock_provisioner.ingester.plugins.yaml.YamlIngester')
design_status, design_data = ingester.ingest_data(
design_state=design_state, design_ref=design_ref)
with pytest.raises(errors.DesignError):
rack = design_data.get_rack('foo')
design_data.get_rack('foo')
@pytest.fixture(scope='module')
def input_files(self, tmpdir_factory, request):
tmpdir = tmpdir_factory.mktemp('data')
samples_dir = os.path.dirname(
str(request.fspath)) + "/" + "../yaml_samples"
samples_dir = os.path.dirname(str(
request.fspath)) + "/" + "../yaml_samples"
samples = os.listdir(samples_dir)
for f in samples:
@ -88,3 +76,30 @@ class TestClass(object):
shutil.copyfile(src_file, dst_file)
return tmpdir
@pytest.fixture(scope='module')
def setup(self):
objects.register_all()
logging.basicConfig()
req_opts = {
'default': [cfg.IntOpt('leader_grace_period')],
'database': [cfg.StrOpt('database_connect_string')],
'logging': [
cfg.StrOpt('global_logger_name', default='drydock'),
]
}
for k, v in req_opts.items():
config.config_mgr.conf.register_opts(v, group=k)
config.config_mgr.conf([])
config.config_mgr.conf.set_override(
name="database_connect_string",
group="database",
override=
"postgresql+psycopg2://drydock:drydock@localhost:5432/drydock")
config.config_mgr.conf.set_override(
name="leader_grace_period", group="default", override=15)
return

View File

@ -14,7 +14,6 @@
import pytest
import shutil
import os
import uuid
import logging
from drydock_provisioner.ingester.plugins.yaml import YamlIngester
@ -28,8 +27,12 @@ class TestClass(object):
ingester = YamlIngester()
models = ingester.ingest_data(filenames=[str(input_file)])
f = open(str(input_file), 'rb')
yaml_string = f.read()
status, models = ingester.ingest_data(content=yaml_string)
assert status.status == 'success'
assert len(models) == 1
def test_ingest_multidoc(self, input_files):
@ -37,15 +40,19 @@ class TestClass(object):
ingester = YamlIngester()
models = ingester.ingest_data(filenames=[str(input_file)])
f = open(str(input_file), 'rb')
yaml_string = f.read()
status, models = ingester.ingest_data(content=yaml_string)
assert status.status == 'success'
assert len(models) == 3
@pytest.fixture(scope='module')
def input_files(self, tmpdir_factory, request):
tmpdir = tmpdir_factory.mktemp('data')
samples_dir = os.path.dirname(
str(request.fspath)) + "/" + "../yaml_samples"
samples_dir = os.path.dirname(str(
request.fspath)) + "/" + "../yaml_samples"
samples = os.listdir(samples_dir)
for f in samples:

View File

@ -18,7 +18,7 @@ import math
from drydock_provisioner import error
from drydock_provisioner.drivers.node.maasdriver.driver import MaasTaskRunner
from drydock_provisioner.drivers.node.maasdriver.actions.node import ApplyNodeStorage
from drydock_provisioner.drivers.node.maasdriver.models.blockdev import BlockDevice
from drydock_provisioner.drivers.node.maasdriver.models.volumegroup import VolumeGroup
@ -31,7 +31,7 @@ class TestCalculateBytes():
drive = BlockDevice(None, size=drive_size, available_size=drive_size)
calc_size = MaasTaskRunner.calculate_bytes(
calc_size = ApplyNodeStorage.calculate_bytes(
size_str=size_str, context=drive)
assert calc_size == 15 * 1000 * 1000
@ -42,7 +42,7 @@ class TestCalculateBytes():
drive_size = 20 * 1000 * 1000
drive = BlockDevice(None, size=drive_size, available_size=drive_size)
calc_size = MaasTaskRunner.calculate_bytes(
calc_size = ApplyNodeStorage.calculate_bytes(
size_str=size_str, context=drive)
assert calc_size == 15 * 1000 * 1000
@ -53,7 +53,7 @@ class TestCalculateBytes():
drive_size = 20 * 1000 * 1000
drive = BlockDevice(None, size=drive_size, available_size=drive_size)
calc_size = MaasTaskRunner.calculate_bytes(
calc_size = ApplyNodeStorage.calculate_bytes(
size_str=size_str, context=drive)
assert calc_size == 15 * 1000 * 1000
@ -64,7 +64,7 @@ class TestCalculateBytes():
drive_size = 20 * 1000 * 1000
drive = BlockDevice(None, size=drive_size, available_size=drive_size)
calc_size = MaasTaskRunner.calculate_bytes(
calc_size = ApplyNodeStorage.calculate_bytes(
size_str=size_str, context=drive)
assert calc_size == 15 * 1000 * 1000
@ -75,7 +75,7 @@ class TestCalculateBytes():
drive_size = 20 * 1000 * 1000 * 1000
drive = BlockDevice(None, size=drive_size, available_size=drive_size)
calc_size = MaasTaskRunner.calculate_bytes(
calc_size = ApplyNodeStorage.calculate_bytes(
size_str=size_str, context=drive)
assert calc_size == 15 * 1000 * 1000 * 1000
@ -86,7 +86,7 @@ class TestCalculateBytes():
drive_size = 20 * 1000 * 1000 * 1000
drive = BlockDevice(None, size=drive_size, available_size=drive_size)
calc_size = MaasTaskRunner.calculate_bytes(
calc_size = ApplyNodeStorage.calculate_bytes(
size_str=size_str, context=drive)
assert calc_size == 15 * 1000 * 1000 * 1000
@ -97,7 +97,7 @@ class TestCalculateBytes():
drive_size = 20 * 1000 * 1000 * 1000
drive = BlockDevice(None, size=drive_size, available_size=drive_size)
calc_size = MaasTaskRunner.calculate_bytes(
calc_size = ApplyNodeStorage.calculate_bytes(
size_str=size_str, context=drive)
assert calc_size == 15 * 1000 * 1000 * 1000
@ -108,7 +108,7 @@ class TestCalculateBytes():
drive_size = 20 * 1000 * 1000 * 1000
drive = BlockDevice(None, size=drive_size, available_size=drive_size)
calc_size = MaasTaskRunner.calculate_bytes(
calc_size = ApplyNodeStorage.calculate_bytes(
size_str=size_str, context=drive)
assert calc_size == 15 * 1000 * 1000 * 1000
@ -119,7 +119,7 @@ class TestCalculateBytes():
drive_size = 20 * 1000 * 1000 * 1000 * 1000
drive = BlockDevice(None, size=drive_size, available_size=drive_size)
calc_size = MaasTaskRunner.calculate_bytes(
calc_size = ApplyNodeStorage.calculate_bytes(
size_str=size_str, context=drive)
assert calc_size == 15 * 1000 * 1000 * 1000 * 1000
@ -130,7 +130,7 @@ class TestCalculateBytes():
drive_size = 20 * 1000 * 1000 * 1000 * 1000
drive = BlockDevice(None, size=drive_size, available_size=drive_size)
calc_size = MaasTaskRunner.calculate_bytes(
calc_size = ApplyNodeStorage.calculate_bytes(
size_str=size_str, context=drive)
assert calc_size == 15 * 1000 * 1000 * 1000 * 1000
@ -141,7 +141,7 @@ class TestCalculateBytes():
drive_size = 20 * 1000 * 1000 * 1000 * 1000
drive = BlockDevice(None, size=drive_size, available_size=drive_size)
calc_size = MaasTaskRunner.calculate_bytes(
calc_size = ApplyNodeStorage.calculate_bytes(
size_str=size_str, context=drive)
assert calc_size == 15 * 1000 * 1000 * 1000 * 1000
@ -152,7 +152,7 @@ class TestCalculateBytes():
drive_size = 20 * 1000 * 1000 * 1000 * 1000
drive = BlockDevice(None, size=drive_size, available_size=drive_size)
calc_size = MaasTaskRunner.calculate_bytes(
calc_size = ApplyNodeStorage.calculate_bytes(
size_str=size_str, context=drive)
assert calc_size == 15 * 1000 * 1000 * 1000 * 1000
@ -165,7 +165,7 @@ class TestCalculateBytes():
drive = BlockDevice(None, size=drive_size, available_size=drive_size)
calc_size = MaasTaskRunner.calculate_bytes(
calc_size = ApplyNodeStorage.calculate_bytes(
size_str=size_str, context=drive)
assert calc_size == part_size
@ -178,7 +178,7 @@ class TestCalculateBytes():
vg = VolumeGroup(None, size=vg_size, available_size=vg_size)
calc_size = MaasTaskRunner.calculate_bytes(
calc_size = ApplyNodeStorage.calculate_bytes(
size_str=size_str, context=vg)
assert calc_size == lv_size
@ -187,25 +187,22 @@ class TestCalculateBytes():
'''When calculated space is higher than available space, raise an exception.'''
vg_size = 20 * 1000 * 1000 # 20 mb drive
vg_available = 10 # 10 bytes available
lv_size = math.floor(.8 * vg_size) # calculate 80% of drive size
size_str = '80%'
vg = VolumeGroup(None, size=vg_size, available_size=vg_available)
with pytest.raises(error.NotEnoughStorage):
calc_size = MaasTaskRunner.calculate_bytes(
size_str=size_str, context=vg)
ApplyNodeStorage.calculate_bytes(size_str=size_str, context=vg)
def test_calculate_min_label(self):
'''Adding the min marker '>' should provision all available space.'''
vg_size = 20 * 1000 * 1000 # 20 mb drive
vg_available = 15 * 1000 * 1000
lv_size = math.floor(.1 * vg_size) # calculate 20% of drive size
size_str = '>10%'
vg = VolumeGroup(None, size=vg_size, available_size=vg_available)
calc_size = MaasTaskRunner.calculate_bytes(
calc_size = ApplyNodeStorage.calculate_bytes(
size_str=size_str, context=vg)
assert calc_size == vg_available

View File

@ -12,8 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
import drydock_provisioner.objects as objects
from drydock_provisioner.objects import fields

View File

@ -1,70 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Generic testing for the orchestrator
#
import threading
import time
import drydock_provisioner.orchestrator as orch
import drydock_provisioner.objects.fields as hd_fields
import drydock_provisioner.statemgmt as statemgmt
import drydock_provisioner.objects.task as task
import drydock_provisioner.drivers as drivers
class TestClass(object):
def test_task_complete(self):
state_mgr = statemgmt.DesignState()
orchestrator = orch.Orchestrator(state_manager=state_mgr)
orch_task = orchestrator.create_task(
task.OrchestratorTask,
site='default',
action=hd_fields.OrchestratorAction.Noop)
orchestrator.execute_task(orch_task.get_id())
orch_task = state_mgr.get_task(orch_task.get_id())
assert orch_task.get_status() == hd_fields.TaskStatus.Complete
for t_id in orch_task.subtasks:
t = state_mgr.get_task(t_id)
assert t.get_status() == hd_fields.TaskStatus.Complete
def test_task_termination(self):
state_mgr = statemgmt.DesignState()
orchestrator = orch.Orchestrator(state_manager=state_mgr)
orch_task = orchestrator.create_task(
task.OrchestratorTask,
site='default',
action=hd_fields.OrchestratorAction.Noop)
orch_thread = threading.Thread(
target=orchestrator.execute_task, args=(orch_task.get_id(), ))
orch_thread.start()
time.sleep(1)
orchestrator.terminate_task(orch_task.get_id())
while orch_thread.is_alive():
time.sleep(1)
orch_task = state_mgr.get_task(orch_task.get_id())
assert orch_task.get_status() == hd_fields.TaskStatus.Terminated
for t_id in orch_task.subtasks:
t = state_mgr.get_task(t_id)
assert t.get_status() == hd_fields.TaskStatus.Terminated

View File

@ -15,8 +15,6 @@ import uuid
from drydock_provisioner.policy import DrydockPolicy
from drydock_provisioner.control.base import DrydockRequestContext
import pytest
class TestDefaultRules():
def test_register_policy(self, mocker):

View File

@ -1,44 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
import shutil
import drydock_provisioner.objects as objects
import drydock_provisioner.statemgmt as statemgmt
class TestClass(object):
def test_sitedesign_post(self):
objects.register_all()
state_manager = statemgmt.DesignState()
design_data = objects.SiteDesign()
design_id = design_data.assign_id()
initial_site = objects.Site()
initial_site.name = 'testsite'
net_a = objects.Network()
net_a.name = 'net_a'
net_a.region = 'testsite'
net_a.cidr = '172.16.0.0/24'
design_data.set_site(initial_site)
design_data.add_network(net_a)
state_manager.post_design(design_data)
my_design = state_manager.get_design(design_id)
assert design_data.obj_to_primitive() == my_design.obj_to_primitive()

View File

@ -477,17 +477,17 @@ spec:
# Map hardware addresses to aliases/roles to allow a mix of hardware configs
# in a site to result in a consistent configuration
device_aliases:
- address: '0000:00:03.0'
alias: prim_nic01
prim_nic01:
address: '0000:00:03.0'
# type could identify expected hardware - used for hardware manifest validation
dev_type: '82540EM Gigabit Ethernet Controller'
bus_type: 'pci'
- address: '0000:00:04.0'
alias: prim_nic02
prim_nic02:
address: '0000:00:04.0'
dev_type: '82540EM Gigabit Ethernet Controller'
bus_type: 'pci'
- address: '2:0.0.0'
alias: primary_boot
primary_boot:
address: '2:0.0.0'
dev_type: 'VBOX HARDDISK'
bus_type: 'scsi'
...

View File

@ -19,6 +19,7 @@ commands=
whitelist_externals=find
commands=
yapf -i -r --style=pep8 {toxinidir}/setup.py
yapf -i -r --style=pep8 {toxinidir}/alembic
yapf -i -r --style=pep8 {toxinidir}/drydock_provisioner
yapf -i -r --style=pep8 {toxinidir}/tests
find {toxinidir}/drydock_provisioner -name '__init__.py' -exec yapf -i --style=pep8 \{\} ;
@ -51,12 +52,12 @@ commands = flake8 \
commands = bandit -r drydock_provisioner -n 5
[flake8]
ignore=E302,H306,H304,D101,D102,D103,D104
exclude= venv,.venv,.git,.idea,.tox,*.egg-info,*.eggs,bin,dist,./build/
ignore=E302,H306,H304,W503,E251
exclude= venv,.venv,.git,.idea,.tox,*.egg-info,*.eggs,bin,dist,./build/,alembic/
max-line-length=119
[testenv:docs]
whitelist_externals=rm
commands =
rm -rf docs/build
python setup.py build_sphinx {posargs}
sphinx-build -b html docs/source docs/build