YAPF catchup

Commit a bunch of formatting changes
caught by yapf during the last PS.

Change-Id: I8e3092c8a45fc2bde258b4a873bbfb32bec7ae1d
This commit is contained in:
Scott Hussey 2017-12-08 16:42:21 -06:00
parent ae87cd1714
commit 1c78477e95
21 changed files with 233 additions and 109 deletions

View File

@ -29,6 +29,7 @@ from .base import StatefulResource
logger = logging.getLogger('drydock')
class BootactionResource(StatefulResource):
bootaction_schema = {
'$schema': 'http://json-schema.org/schema#',

View File

@ -72,7 +72,9 @@ class ValidationResource(StatefulResource):
design_ref)
resp_message['details']['errorCount'] = message.error_count
resp_message['details']['messageList'] = [m.to_dict() for m in message.message_list]
resp_message['details']['messageList'] = [
m.to_dict() for m in message.message_list
]
if message.error_count == 0:
resp_message['status'] = 'Valid'

View File

@ -50,6 +50,9 @@ class NodeDriver(ProviderDriver):
task_action = task.action
if task_action in self.supported_actions:
task.success()
task.set_status(hd_fields.TaskStatus.Complete)
task.save()
return
else:
raise errors.DriverError("Unsupported action %s for driver %s" %

View File

@ -70,8 +70,8 @@ class PyghmiBaseAction(BaseAction):
self.logger.debug("Initializing IPMI session")
ipmi_session = self.get_ipmi_session(node)
except (IpmiException, errors.DriverError) as iex:
self.logger.error("Error initializing IPMI session for node %s"
% node.name)
self.logger.error(
"Error initializing IPMI session for node %s" % node.name)
self.logger.debug("IPMI Exception: %s" % str(iex))
self.logger.warning(
"IPMI command failed, retrying after 15 seconds...")

View File

@ -89,7 +89,8 @@ class Ingester(object):
"Ingester:ingest_data ingesting design parts for design %s" %
design_ref)
design_blob = design_state.get_design_documents(design_ref)
self.logger.debug("Ingesting design data of %d bytes." % len(design_blob))
self.logger.debug(
"Ingesting design data of %d bytes." % len(design_blob))
try:
status, design_items = self.registered_plugin.ingest_data(

View File

@ -78,9 +78,11 @@ class DeckhandIngester(IngesterPlugin):
ps.set_status(hd_fields.ActionResult.Success)
for d in parsed_data:
try:
(schema_ns, doc_kind, doc_version) = d.get('schema', '').split('/')
(schema_ns, doc_kind, doc_version) = d.get('schema',
'').split('/')
except ValueError as ex:
self.logger.error("Error with document structure.", exc_info=ex)
self.logger.error(
"Error with document structure.", exc_info=ex)
self.logger.debug("Error document\n%s" % yaml.dump(d))
continue
if schema_ns == 'drydock':

View File

@ -300,8 +300,8 @@ class Task(object):
for st in self.statemgr.get_complete_subtasks(self.task_id):
if action_filter is None or (action_filter is not None
and st.action == action_filter):
self.logger.debug(
"Collecting result status from subtask %s." % str(st.task_id))
self.logger.debug("Collecting result status from subtask %s." %
str(st.task_id))
if st.get_result() in [
hd_fields.ActionResult.Success,
hd_fields.ActionResult.PartialSuccess

View File

@ -567,22 +567,27 @@ class Orchestrator(object):
for n in site_design.networks:
if n.routedomain is not None:
if n.routedomain not in routedomains:
self.logger.info("Adding routedomain %s to render map." % n.routedomain)
self.logger.info("Adding routedomain %s to render map."
% n.routedomain)
routedomains[n.routedomain] = list()
routedomains[n.routedomain].append(n)
for rd, nl in routedomains.items():
rd_cidrs = [n.cidr for n in nl]
self.logger.debug("Target CIDRs for routedomain %s: %s" % (rd, ','.join(rd_cidrs)))
self.logger.debug("Target CIDRs for routedomain %s: %s" %
(rd, ','.join(rd_cidrs)))
for n in site_design.networks:
gw = None
metric = None
for r in n.routes:
if 'routedomain' in r and r.get('routedomain', None) == rd:
if 'routedomain' in r and r.get('routedomain',
None) == rd:
gw = r.get('gateway')
metric = r.get('metric')
self.logger.debug("Use gateway %s for routedomain %s on network %s." %
(gw, rd, n.get_name()))
self.logger.debug(
"Use gateway %s for routedomain %s on network %s."
% (gw, rd, n.get_name()))
break
if gw is not None and metric is not None:
for cidr in rd_cidrs:
n.routes.append(dict(subnet=cidr, gateway=gw, metric=metric))
n.routes.append(
dict(subnet=cidr, gateway=gw, metric=metric))

View File

@ -41,7 +41,8 @@ class Validator():
output = rule(site_design)
result_status.message_list.extend(output)
error_msg = [m for m in output if m.error]
result_status.error_count = result_status.error_count + len(error_msg)
result_status.error_count = result_status.error_count + len(
error_msg)
if len(error_msg) > 0:
validation_error = True
@ -70,47 +71,71 @@ class Validator():
if any([
network_link.get(x)
for x in [
'bonding_peer_rate', 'bonding_xmit_hash', 'bonding_mon_rate', 'bonding_up_delay',
'bonding_peer_rate', 'bonding_xmit_hash',
'bonding_mon_rate', 'bonding_up_delay',
'bonding_down_delay'
]
]):
msg = ('Network Link Bonding Error: If bonding mode is disabled no other bond option can be'
'specified; on BaremetalNode %s' % network_link.get('name'))
msg = (
'Network Link Bonding Error: If bonding mode is disabled no other bond option can be'
'specified; on BaremetalNode %s' %
network_link.get('name'))
message_list.append(TaskStatusMessage(msg=msg, error=True, ctx_type='NA', ctx='NA'))
message_list.append(
TaskStatusMessage(
msg=msg, error=True, ctx_type='NA', ctx='NA'))
elif bonding_mode == '802.3ad':
# check if up_delay and down_delay are >= mon_rate
mon_rate = network_link.get('bonding_mon_rate')
if network_link.get('bonding_up_delay') < mon_rate:
msg = ('Network Link Bonding Error: Up delay is less '
'than mon rate on BaremetalNode %s' % (network_link.get('name')))
'than mon rate on BaremetalNode %s' %
(network_link.get('name')))
message_list.append(TaskStatusMessage(msg=msg, error=True, ctx_type='NA', ctx='NA'))
message_list.append(
TaskStatusMessage(
msg=msg, error=True, ctx_type='NA', ctx='NA'))
if network_link.get('bonding_down_delay') < mon_rate:
msg = ('Network Link Bonding Error: Down delay is '
'less than mon rate on BaremetalNode %s' % (network_link.get('name')))
'less than mon rate on BaremetalNode %s' %
(network_link.get('name')))
message_list.append(TaskStatusMessage(msg=msg, error=True, ctx_type='NA', ctx='NA'))
message_list.append(
TaskStatusMessage(
msg=msg, error=True, ctx_type='NA', ctx='NA'))
elif bonding_mode in ['active-backup', 'balanced-rr']:
# make sure hash and peer_rate are NOT defined
if network_link.get('bonding_xmit_hash'):
msg = ('Network Link Bonding Error: Hash cannot be defined if bond mode is '
'%s, on BaremetalNode %s' % (bonding_mode, network_link.get('name')))
msg = (
'Network Link Bonding Error: Hash cannot be defined if bond mode is '
'%s, on BaremetalNode %s' % (bonding_mode,
network_link.get('name')))
message_list.append(TaskStatusMessage(msg=msg, error=True, ctx_type='NA', ctx='NA'))
message_list.append(
TaskStatusMessage(
msg=msg, error=True, ctx_type='NA', ctx='NA'))
if network_link.get('bonding_peer_rate'):
msg = ('Network Link Bonding Error: Peer rate cannot be defined if bond mode is '
'%s, on BaremetalNode %s' % (bonding_mode, network_link.get('name')))
msg = (
'Network Link Bonding Error: Peer rate cannot be defined if bond mode is '
'%s, on BaremetalNode %s' % (bonding_mode,
network_link.get('name')))
message_list.append(TaskStatusMessage(msg=msg, error=True, ctx_type='NA', ctx='NA'))
message_list.append(
TaskStatusMessage(
msg=msg, error=True, ctx_type='NA', ctx='NA'))
if not message_list:
message_list.append(TaskStatusMessage(msg='Network Link Bonding', error=False, ctx_type='NA', ctx='NA'))
message_list.append(
TaskStatusMessage(
msg='Network Link Bonding',
error=False,
ctx_type='NA',
ctx='NA'))
return message_list
@classmethod
@ -127,26 +152,39 @@ class Validator():
for network_link in network_link_list:
allowed_networks = network_link.get('allowed_networks', [])
# if allowed networks > 1 trunking must be enabled
if (len(allowed_networks) > 1
and network_link.get('trunk_mode') == hd_fields.NetworkLinkTrunkingMode.Disabled):
if (len(allowed_networks) > 1 and network_link.get('trunk_mode') ==
hd_fields.NetworkLinkTrunkingMode.Disabled):
msg = ('Rational Network Trunking Error: If there is more than 1 allowed network,'
'trunking mode must be enabled; on NetworkLink %s' % network_link.get('name'))
msg = (
'Rational Network Trunking Error: If there is more than 1 allowed network,'
'trunking mode must be enabled; on NetworkLink %s' %
network_link.get('name'))
message_list.append(TaskStatusMessage(msg=msg, error=True, ctx_type='NA', ctx='NA'))
message_list.append(
TaskStatusMessage(
msg=msg, error=True, ctx_type='NA', ctx='NA'))
# trunking mode is disabled, default_network must be defined
if (network_link.get('trunk_mode') == hd_fields.NetworkLinkTrunkingMode.Disabled
if (network_link.get(
'trunk_mode') == hd_fields.NetworkLinkTrunkingMode.Disabled
and network_link.get('native_network') is None):
msg = ('Rational Network Trunking Error: Trunking mode is disabled, a trunking'
'default_network must be defined; on NetworkLink %s' % network_link.get('name'))
msg = (
'Rational Network Trunking Error: Trunking mode is disabled, a trunking'
'default_network must be defined; on NetworkLink %s' %
network_link.get('name'))
message_list.append(TaskStatusMessage(msg=msg, error=True, ctx_type='NA', ctx='NA'))
message_list.append(
TaskStatusMessage(
msg=msg, error=True, ctx_type='NA', ctx='NA'))
if not message_list:
message_list.append(
TaskStatusMessage(msg='Rational Network Trunking', error=False, ctx_type='NA', ctx='NA'))
TaskStatusMessage(
msg='Rational Network Trunking',
error=False,
ctx_type='NA',
ctx='NA'))
return message_list
@classmethod
@ -169,12 +207,15 @@ class Validator():
volume_group = storage_device.get('volume_group')
# error if both or neither is defined
if all([partitions_list, volume_group]) or not any([partitions_list, volume_group]):
if all([partitions_list, volume_group
]) or not any([partitions_list, volume_group]):
msg = ('Storage Partitioning Error: Either a volume group '
'OR partitions must be defined for each storage '
'device; on BaremetalNode '
'%s' % baremetal_node.get('name'))
message_list.append(TaskStatusMessage(msg=msg, error=True, ctx_type='NA', ctx='NA'))
message_list.append(
TaskStatusMessage(
msg=msg, error=True, ctx_type='NA', ctx='NA'))
# if there is a volume group add to list
if volume_group is not None:
@ -190,7 +231,12 @@ class Validator():
msg = ('Storage Partitioning Error: Both a volume group AND file system cannot be '
'defined in a sigle partition; on BaremetalNode %s' % baremetal_node.get('name'))
message_list.append(TaskStatusMessage(msg=msg, error=True, ctx_type='NA', ctx='NA'))
message_list.append(
TaskStatusMessage(
msg=msg,
error=True,
ctx_type='NA',
ctx='NA'))
# if there is a volume group add to list
if partition_volume_group is not None:
@ -206,10 +252,17 @@ class Validator():
'partition; volume group %s on BaremetalNode %s' %
(volume_group.get('name'), baremetal_node.get('name')))
message_list.append(TaskStatusMessage(msg=msg, error=True, ctx_type='NA', ctx='NA'))
message_list.append(
TaskStatusMessage(
msg=msg, error=True, ctx_type='NA', ctx='NA'))
if not message_list:
message_list.append(TaskStatusMessage(msg='Storage Partitioning', error=False, ctx_type='NA', ctx='NA'))
message_list.append(
TaskStatusMessage(
msg='Storage Partitioning',
error=False,
ctx_type='NA',
ctx='NA'))
return message_list
@classmethod
@ -236,11 +289,16 @@ class Validator():
for network_link_name_2 in compare:
if (network_link_name is not network_link_name_2
and sorted([network_link_name, network_link_name_2]) not in checked_pairs):
checked_pairs.append(sorted([network_link_name, network_link_name_2]))
and sorted([network_link_name, network_link_name_2
]) not in checked_pairs):
checked_pairs.append(
sorted([network_link_name, network_link_name_2]))
allowed_network_list_2 = compare[network_link_name_2]
# creates a list of duplicated allowed networks
duplicated_names = [i for i in allowed_network_list_1 if i in allowed_network_list_2]
duplicated_names = [
i for i in allowed_network_list_1
if i in allowed_network_list_2
]
for name in duplicated_names:
msg = ('Unique Network Error: Allowed network %s duplicated on NetworkLink %s and NetworkLink '
@ -248,7 +306,10 @@ class Validator():
message_list.append(TaskStatusMessage(msg=msg, error=True, ctx_type='NA', ctx='NA'))
if not message_list:
message_list.append(TaskStatusMessage(msg='Unique Network', error=False, ctx_type='NA', ctx='NA'))
message_list.append(
TaskStatusMessage(
msg='Unique Network', error=False, ctx_type='NA',
ctx='NA'))
return message_list
@classmethod

View File

@ -125,8 +125,7 @@ class DrydockPolicy(object):
validation_rules = [
policy.DocumentedRuleDefault(
'physical_provisioner:validate_site_design', 'role:admin',
'Validate site design',
[{
'Validate site design', [{
'path': '/api/v1.0/validatedesign',
'method': 'POST'
}]),

View File

@ -95,14 +95,16 @@ class ReferenceResolver(object):
"""
ks_sess = KeystoneUtils.get_session()
(new_scheme, foo) = re.subn('^[^+]+\+', '', design_uri.scheme)
url = urllib.parse.urlunparse((new_scheme, design_uri.netloc, design_uri.path,
design_uri.params, design_uri.query, design_uri.fragment))
url = urllib.parse.urlunparse(
(new_scheme, design_uri.netloc, design_uri.path, design_uri.params,
design_uri.query, design_uri.fragment))
logger = logging.getLogger(__name__)
logger.debug("Calling Keystone session for url %s" % str(url))
resp = ks_sess.get(url)
if resp.status_code >= 400:
raise errors.InvalidDesignReference(
"Received error code for reference %s: %s - %s" % (url, str(resp.status_code), resp.text))
"Received error code for reference %s: %s - %s" %
(url, str(resp.status_code), resp.text))
return resp.content
scheme_handlers = {

View File

@ -39,6 +39,7 @@ class KeystoneUtils(object):
auth = v3.Password(**auth_info)
return session.Session(auth=auth)
class NoAuthFilter(object):
"""PasteDeploy filter for NoAuth to be used in testing."""
@ -58,8 +59,10 @@ class NoAuthFilter(object):
environ['HTTP_X_IDENTITY_STATUS'] = 'Confirmed'
for envvar in ['USER_NAME', 'USER_ID', 'USER_DOMAIN_ID', 'PROJECT_ID',
'PROJECT_DOMAIN_NAME']:
for envvar in [
'USER_NAME', 'USER_ID', 'USER_DOMAIN_ID', 'PROJECT_ID',
'PROJECT_DOMAIN_NAME'
]:
varname = "HTTP_X_%s" % envvar
environ[varname] = 'noauth'
@ -75,6 +78,7 @@ class NoAuthFilter(object):
return self.app(environ, start_response)
def noauth_filter_factory(global_conf, forged_roles):
"""Create a NoAuth paste deploy filter
@ -84,4 +88,5 @@ def noauth_filter_factory(global_conf, forged_roles):
def filter(app):
return NoAuthFilter(app, forged_roles)
return filter

View File

@ -33,6 +33,7 @@ def deckhand_ingester():
'drydock_provisioner.ingester.plugins.deckhand.DeckhandIngester')
return ingester
@pytest.fixture()
def yaml_ingester():
ingester = Ingester()
@ -40,18 +41,21 @@ def yaml_ingester():
'drydock_provisioner.ingester.plugins.yaml.YamlIngester')
return ingester
@pytest.fixture()
def deckhand_orchestrator(drydock_state, deckhand_ingester):
orchestrator = Orchestrator(
state_manager=drydock_state, ingester=deckhand_ingester)
return orchestrator
@pytest.fixture()
def yaml_orchestrator(drydock_state, yaml_ingester):
orchestrator = Orchestrator(
state_manager=drydock_state, ingester=yaml_ingester)
return orchestrator
@pytest.fixture()
def blank_state(drydock_state):
drydock_state.tabularasa()

View File

@ -82,7 +82,8 @@ class TestValidationApi(object):
assert result.status == falcon.HTTP_400
@pytest.fixture()
def falcontest(self, drydock_state, deckhand_ingester, deckhand_orchestrator):
def falcontest(self, drydock_state, deckhand_ingester,
deckhand_orchestrator):
"""Create a test harness for the the Falcon API framework."""
policy.policy_engine = policy.DrydockPolicy()
policy.policy_engine.register_policy()

View File

@ -23,7 +23,8 @@ from drydock_provisioner.control.bootaction import BootactionUtils
class TestClass(object):
def test_bootaction_tarbuilder(self, input_files, deckhand_ingester, setup):
def test_bootaction_tarbuilder(self, input_files, deckhand_ingester,
setup):
objects.register_all()
input_file = input_files.join("deckhand_fullsite.yaml")

View File

@ -18,7 +18,8 @@ import drydock_provisioner.objects as objects
class TestClass(object):
def test_node_filter_obj(self, input_files, setup, deckhand_orchestrator, deckhand_ingester):
def test_node_filter_obj(self, input_files, setup, deckhand_orchestrator,
deckhand_ingester):
input_file = input_files.join("deckhand_fullsite.yaml")
design_state = DrydockState()
@ -37,7 +38,8 @@ class TestClass(object):
assert len(node_list) == 1
def test_node_filter_dict(self, input_files, setup, deckhand_orchestrator, deckhand_ingester):
def test_node_filter_dict(self, input_files, setup, deckhand_orchestrator,
deckhand_ingester):
input_file = input_files.join("deckhand_fullsite.yaml")
design_state = DrydockState()

View File

@ -1,4 +1,3 @@
import yaml
import jsonschema
import pkg_resources
@ -8,6 +7,7 @@ import pytest
from jsonschema.exceptions import ValidationError
class BaseSchemaValidationTest(object):
def _test_validate(self, schema, expect_failure, input_files, input):
"""validates input yaml against schema.
@ -35,94 +35,121 @@ class BaseSchemaValidationTest(object):
class TestValidation(BaseSchemaValidationTest):
def test_validate_baremetalNode(self, input_files):
self._test_validate('baremetalNode.yaml', False, input_files, "baremetalNode.yaml")
self._test_validate('baremetalNode.yaml', False, input_files,
"baremetalNode.yaml")
def test_validate_baremetalNode2(self, input_files):
self._test_validate('baremetalNode.yaml', False, input_files, "baremetalNode2.yaml")
self._test_validate('baremetalNode.yaml', False, input_files,
"baremetalNode2.yaml")
def test_invalidate_baremetalNode(self, input_files):
self._test_validate('baremetalNode.yaml', True, input_files, "invalid_baremetalNode.yaml")
self._test_validate('baremetalNode.yaml', True, input_files,
"invalid_baremetalNode.yaml")
def test_invalidate_baremetalNode2(self, input_files):
self._test_validate('baremetalNode.yaml', True, input_files, "invalid_baremetalNode2.yaml")
self._test_validate('baremetalNode.yaml', True, input_files,
"invalid_baremetalNode2.yaml")
def test_validate_hardwareProfile(self, input_files):
self._test_validate('hardwareProfile.yaml', False, input_files, "hardwareProfile.yaml")
self._test_validate('hardwareProfile.yaml', False, input_files,
"hardwareProfile.yaml")
def test_invalidate_hardwareProfile(self, input_files):
self._test_validate('hardwareProfile.yaml', True, input_files, "invalid_hardwareProfile.yaml")
self._test_validate('hardwareProfile.yaml', True, input_files,
"invalid_hardwareProfile.yaml")
def test_validate_hostProfile(self, input_files):
self._test_validate('hostProfile.yaml', False, input_files, "hostProfile.yaml")
self._test_validate('hostProfile.yaml', False, input_files,
"hostProfile.yaml")
def test_validate_hostProfile2(self, input_files):
self._test_validate('hostProfile.yaml', False, input_files, "hostProfile2.yaml")
self._test_validate('hostProfile.yaml', False, input_files,
"hostProfile2.yaml")
def test_invalidate_hostProfile(self, input_files):
self._test_validate('hostProfile.yaml', True, input_files, "invalid_hostProfile.yaml")
self._test_validate('hostProfile.yaml', True, input_files,
"invalid_hostProfile.yaml")
def test_invalidate_hostProfile2(self, input_files):
self._test_validate('hostProfile.yaml', True, input_files, "invalid_hostProfile2.yaml")
self._test_validate('hostProfile.yaml', True, input_files,
"invalid_hostProfile2.yaml")
def test_validate_network(self, input_files):
self._test_validate('network.yaml', False, input_files, "network.yaml")
def test_validate_network2(self, input_files):
self._test_validate('network.yaml', False, input_files, "network2.yaml")
self._test_validate('network.yaml', False, input_files,
"network2.yaml")
def test_validate_network3(self, input_files):
self._test_validate('network.yaml', False, input_files, "network3.yaml")
self._test_validate('network.yaml', False, input_files,
"network3.yaml")
def test_validate_network4(self, input_files):
self._test_validate('network.yaml', False, input_files, "network4.yaml")
self._test_validate('network.yaml', False, input_files,
"network4.yaml")
def test_validate_network5(self, input_files):
self._test_validate('network.yaml', False, input_files, "network5.yaml")
self._test_validate('network.yaml', False, input_files,
"network5.yaml")
def test_invalidate_network(self, input_files):
self._test_validate('network.yaml', True, input_files, "invalid_network.yaml")
self._test_validate('network.yaml', True, input_files,
"invalid_network.yaml")
def test_invalidate_network2(self, input_files):
self._test_validate('network.yaml', True, input_files, "invalid_network2.yaml")
self._test_validate('network.yaml', True, input_files,
"invalid_network2.yaml")
def test_invalidate_network3(self, input_files):
self._test_validate('network.yaml', True, input_files, "invalid_network3.yaml")
self._test_validate('network.yaml', True, input_files,
"invalid_network3.yaml")
def test_invalidate_network4(self, input_files):
self._test_validate('network.yaml', True, input_files, "invalid_network4.yaml")
self._test_validate('network.yaml', True, input_files,
"invalid_network4.yaml")
def test_invalidate_network5(self, input_files):
self._test_validate('network.yaml', True, input_files, "invalid_network5.yaml")
self._test_validate('network.yaml', True, input_files,
"invalid_network5.yaml")
def test_validate_networkLink(self, input_files):
self._test_validate('networkLink.yaml', False, input_files, "networkLink.yaml")
self._test_validate('networkLink.yaml', False, input_files,
"networkLink.yaml")
def test_validate_networkLink2(self, input_files):
self._test_validate('networkLink.yaml', False, input_files, "networkLink2.yaml")
self._test_validate('networkLink.yaml', False, input_files,
"networkLink2.yaml")
def test_validate_networkLink3(self, input_files):
self._test_validate('networkLink.yaml', False, input_files, "networkLink3.yaml")
self._test_validate('networkLink.yaml', False, input_files,
"networkLink3.yaml")
def test_invalidate_networkLink(self, input_files):
self._test_validate('networkLink.yaml', True, input_files, "invalid_networkLink.yaml")
self._test_validate('networkLink.yaml', True, input_files,
"invalid_networkLink.yaml")
def test_invalidate_networkLink2(self, input_files):
self._test_validate('networkLink.yaml', True, input_files, "invalid_networkLink2.yaml")
self._test_validate('networkLink.yaml', True, input_files,
"invalid_networkLink2.yaml")
def test_invalidate_networkLink3(self, input_files):
self._test_validate('networkLink.yaml', True, input_files, "invalid_networkLink3.yaml")
self._test_validate('networkLink.yaml', True, input_files,
"invalid_networkLink3.yaml")
def test_validate_region(self, input_files):
self._test_validate('region.yaml', False, input_files, "region.yaml")
def test_invalidate_region(self, input_files):
self._test_validate('region.yaml', True, input_files, "invalid_region.yaml")
self._test_validate('region.yaml', True, input_files,
"invalid_region.yaml")
def test_validate_rack(self, input_files):
self._test_validate('rack.yaml', False, input_files, "rack.yaml")
def test_invalidate_rack(self, input_files):
self._test_validate('rack.yaml', True, input_files, "invalid_rack.yaml")
self._test_validate('rack.yaml', True, input_files,
"invalid_rack.yaml")
@pytest.fixture(scope='module')
def input_files(self, tmpdir_factory, request):

View File

@ -18,12 +18,13 @@ from drydock_provisioner.orchestrator.validations.validator import Validator
import re
class TestRationalNetworkLinkBond(object):
def test_rational_network_bond(self, mocker, deckhand_ingester, drydock_state, input_files):
def test_rational_network_bond(self, mocker, deckhand_ingester,
drydock_state, input_files):
input_file = input_files.join("rational_network_bond.yaml")
design_ref = "file://%s" % str(input_file)
orch = Orchestrator(state_manager=drydock_state, ingester=deckhand_ingester)
orch = Orchestrator(
state_manager=drydock_state, ingester=deckhand_ingester)
status, site_design = Orchestrator.get_effective_site(orch, design_ref)
@ -34,12 +35,13 @@ class TestRationalNetworkLinkBond(object):
assert msg.get('error') is False
assert len(message_list) == 1
def test_invalid_rational_network_bond(self, mocker, deckhand_ingester, drydock_state, input_files):
def test_invalid_rational_network_bond(self, mocker, deckhand_ingester,
drydock_state, input_files):
input_file = input_files.join("invalid_rational_network_bond.yaml")
design_ref = "file://%s" % str(input_file)
orch = Orchestrator(state_manager=drydock_state, ingester=deckhand_ingester)
orch = Orchestrator(
state_manager=drydock_state, ingester=deckhand_ingester)
status, site_design = Orchestrator.get_effective_site(orch, design_ref)

View File

@ -18,12 +18,13 @@ from drydock_provisioner.orchestrator.validations.validator import Validator
import re
class TestRationalNetworkTrunking(object):
def test_rational_network_trunking(self, deckhand_ingester, drydock_state, input_files):
def test_rational_network_trunking(self, deckhand_ingester, drydock_state,
input_files):
input_file = input_files.join("rational_network_trunking.yaml")
design_ref = "file://%s" % str(input_file)
orch = Orchestrator(state_manager=drydock_state, ingester=deckhand_ingester)
orch = Orchestrator(
state_manager=drydock_state, ingester=deckhand_ingester)
status, site_design = Orchestrator.get_effective_site(orch, design_ref)
@ -33,12 +34,13 @@ class TestRationalNetworkTrunking(object):
assert msg.get('message') == 'Rational Network Trunking'
assert msg.get('error') is False
def test_invalid_rational_network_trunking(self, deckhand_ingester, drydock_state, input_files):
def test_invalid_rational_network_trunking(self, deckhand_ingester,
drydock_state, input_files):
input_file = input_files.join("invalid_rational_network_trunking.yaml")
design_ref = "file://%s" % str(input_file)
orch = Orchestrator(state_manager=drydock_state, ingester=deckhand_ingester)
orch = Orchestrator(
state_manager=drydock_state, ingester=deckhand_ingester)
status, site_design = Orchestrator.get_effective_site(orch, design_ref)

View File

@ -17,13 +17,15 @@ from drydock_provisioner.orchestrator.orchestrator import Orchestrator
from drydock_provisioner.orchestrator.validations.validator import Validator
import re
class TestRationalStoragePartitioning(object):
def test_storage_partitioning(self, deckhand_ingester, drydock_state, input_files):
class TestRationalNetworkTrunking(object):
def test_storage_partitioning(self, deckhand_ingester, drydock_state,
input_files):
input_file = input_files.join("storage_partitioning.yaml")
design_ref = "file://%s" % str(input_file)
orch = Orchestrator(state_manager=drydock_state, ingester=deckhand_ingester)
orch = Orchestrator(
state_manager=drydock_state, ingester=deckhand_ingester)
status, site_design = Orchestrator.get_effective_site(orch, design_ref)
@ -35,7 +37,6 @@ class TestRationalStoragePartitioning(object):
assert msg.get('error') is False
def test_storage_partitioning_unassigned_partition(self, deckhand_ingester, drydock_state, input_files):
input_file = input_files.join("storage_partitioning_unassigned_partition.yaml")
design_ref = "file://%s" % str(input_file)
@ -50,12 +51,13 @@ class TestRationalStoragePartitioning(object):
assert msg.get('message') == 'Storage Partitioning'
assert msg.get('error') is False
def test_invalid_storage_partitioning(self, deckhand_ingester, drydock_state, input_files):
def test_invalid_storage_partitioning(self, deckhand_ingester,
drydock_state, input_files):
input_file = input_files.join("invalid_storage_partitioning.yaml")
design_ref = "file://%s" % str(input_file)
orch = Orchestrator(state_manager=drydock_state, ingester=deckhand_ingester)
orch = Orchestrator(
state_manager=drydock_state, ingester=deckhand_ingester)
status, site_design = Orchestrator.get_effective_site(orch, design_ref)

View File

@ -23,7 +23,8 @@ class TestUniqueNetwork(object):
input_file = input_files.join("unique_network.yaml")
design_ref = "file://%s" % str(input_file)
orch = Orchestrator(state_manager=drydock_state, ingester=deckhand_ingester)
orch = Orchestrator(
state_manager=drydock_state, ingester=deckhand_ingester)
status, site_design = Orchestrator.get_effective_site(orch, design_ref)
@ -39,7 +40,8 @@ class TestUniqueNetwork(object):
input_file = input_files.join("invalid_unique_network.yaml")
design_ref = "file://%s" % str(input_file)
orch = Orchestrator(state_manager=drydock_state, ingester=deckhand_ingester)
orch = Orchestrator(
state_manager=drydock_state, ingester=deckhand_ingester)
status, site_design = Orchestrator.get_effective_site(orch, design_ref)