Adds tests for Spyglass data objects

Adds unit tests for all data objects created in [0]. The changes in [0]
were merged in short succession with [1], causing the test coverage gate
to fail with all the newly introduced lines of code. This change adds
tests for all of the newly added code and increases test coverage
requirement to 60% (currently at 65.93%).

[0] https://review.opendev.org/#/c/658917/
[1] https://review.opendev.org/#/c/663729/

Change-Id: I96931e3e415af80ca5ab9202c2bda0344a9901f0
This commit is contained in:
Ian H. Pittwood 2019-06-18 15:15:28 -05:00
parent 6b8d6f2aae
commit 313058b8fb
3 changed files with 847 additions and 15 deletions

View File

@ -57,7 +57,7 @@ class ServerList(object):
def __iter__(self):
yield from self.servers
def merge(self, server_list: str):
def merge(self, server_list):
"""Merges a comma separated server list into the object
This method is used to merge additional servers into the list. This is
@ -106,7 +106,7 @@ class IPList(object):
'overlay': self.overlay,
'pxe': self.pxe,
'storage': self.storage
}
}.items()
def set_ip_by_role(self, role: str, new_value):
if role == 'oob':
@ -187,12 +187,14 @@ class Host(object):
return {
self.name: {
'host_profile': self.host_profile,
'ip': self.ip,
'ip': self.ip.dict_from_class(),
'type': self.type
}
}
def merge_additional_data(self, config_dict: dict):
if 'rack_name' in config_dict:
self.rack_name = config_dict['rack_name']
if 'type' in config_dict:
self.type = config_dict['type']
if 'host_profile' in config_dict:
@ -218,7 +220,7 @@ class Rack(object):
"""Creates a writeable dict structure from the object"""
rack_as_dict = {self.name: {}}
for host in self.hosts:
rack_as_dict.update(host.dict_from_class())
rack_as_dict[self.name].update(host.dict_from_class())
return rack_as_dict
def merge_additional_data(self, config_dict: dict):
@ -257,9 +259,16 @@ class VLANNetworkData(object):
:Keyword Arguments:
* *role* (``str``) - Role of the data entry, defaults to name
* *vlan* (``str``, ``int``) -
* *type* (``str``) - Host type
* *ip* (``IPList``) - List of IP addresses for baremetal host
* *vlan* (``str``, ``int``) - virtual LAN ID number as str or int
* *subnet* (``list``) - list of subnet IP addresses as strings
* *routes* (``list``) - list of routes IP addresses as strings
* *gateway* - gateway address
* *dhcp_start* - DHCP range start
* *dhcp_end* - DHCP range end
* *static_start* - static IP range start
* *static_end* - static IP range end
* *reserved_start* - reserved IP range start
* *reserved_end* - reserved IP range end
"""
self.name = name
self.role = kwargs.get('role', self.name)
@ -307,6 +316,8 @@ class VLANNetworkData(object):
return vlan_dict
def merge_additional_data(self, config_dict: dict):
if 'role' in config_dict:
self.role = config_dict['role']
if 'vlan' in config_dict:
self.vlan = config_dict['vlan']
if 'subnet' in config_dict:
@ -320,15 +331,15 @@ class VLANNetworkData(object):
if 'dhcp_start' in config_dict:
self.dhcp_start = config_dict['dhcp_start']
if 'dhcp_end' in config_dict:
self.dhcp_start = config_dict['dhcp_end']
self.dhcp_end = config_dict['dhcp_end']
if 'static_start' in config_dict:
self.dhcp_start = config_dict['static_start']
self.static_start = config_dict['static_start']
if 'static_end' in config_dict:
self.dhcp_start = config_dict['static_end']
self.static_end = config_dict['static_end']
if 'reserved_start' in config_dict:
self.dhcp_start = config_dict['reserved_start']
self.reserved_start = config_dict['reserved_start']
if 'reserved_end' in config_dict:
self.dhcp_start = config_dict['reserved_end']
self.reserved_end = config_dict['reserved_end']
class Network(object):
@ -445,11 +456,11 @@ class SiteInfo(object):
return {
'corridor': self.corridor,
'country': self.country,
'dns': self.dns,
'dns': str(self.dns),
'domain': self.domain,
'ldap': self.ldap,
'name': self.name,
'ntp': self.ntp,
'ntp': str(self.ntp),
'physical_location_id': self.physical_location_id,
'sitetype': self.sitetype,
'state': self.state,

View File

@ -0,0 +1,821 @@
# Copyright 2019 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from copy import copy
import unittest
from unittest import mock
from spyglass.data_extractor import models
class TestParseIp(unittest.TestCase):
"""Tests the _parse_ip validator for Spyglass models"""
def test__parse_ip(self):
"""Tests basic function of _parse_ip validator"""
addr = '10.23.0.1'
result = models._parse_ip(addr)
self.assertEqual(addr, result)
def test__parse_ip_bad_ip(self):
"""Tests that invalid IP addresses are logged as a warning and returned
without changes
"""
addr = 'not4nip4ddr3$$'
expected_message = '%s is not a valid IP address.' % addr
with self.assertLogs(level='WARNING') as test_log:
result = models._parse_ip(addr)
self.assertEqual(len(test_log.output), 1)
self.assertEqual(len(test_log.records), 1)
self.assertIn(expected_message, test_log.output[0])
self.assertEqual(addr, result)
class TestServerList(unittest.TestCase):
"""Tests for the ServerList model"""
VALID_SERVERS = ['121.12.13.1', '193.153.1.1', '12.23.9.11']
INVALID_SERVERS = ['not4nip4ddr3$$', '124.34.1.1', 'ALSONOTVALID']
def test___init__(self):
"""Tests basic initialization of ServerList"""
result = models.ServerList(self.VALID_SERVERS)
self.assertEqual(set(self.VALID_SERVERS), set(result.servers))
def test___init___invalid_servers(self):
"""Tests initialization of ServerList with bad IPs
If ServerList is given IP addresses that appear invalid, it should log
warnings for the user. ServerList should still initialize regardless
of the IPs it is given.
"""
expected_messages = [
'%s is not a valid IP address.' % self.INVALID_SERVERS[0],
'%s is not a valid IP address.' % self.INVALID_SERVERS[2]
]
with self.assertLogs(level='WARNING') as test_log:
result = models.ServerList(self.INVALID_SERVERS)
self.assertEqual(len(test_log.output), 2)
self.assertEqual(len(test_log.records), 2)
self.assertIn(expected_messages[0], test_log.output[0])
self.assertIn(expected_messages[1], test_log.output[1])
self.assertEqual(set(self.INVALID_SERVERS), set(result.servers))
def test___str__(self):
"""Tests str type casting for ServerList"""
expected_result = ','.join(self.VALID_SERVERS)
result = models.ServerList(self.VALID_SERVERS)
self.assertEqual(expected_result, str(result))
def test___iter__(self):
"""Tests iterator builtin for ServerList"""
result = models.ServerList(self.VALID_SERVERS)
iterator = iter(result)
self.assertEqual(iterator.__next__(), self.VALID_SERVERS[0])
self.assertEqual(iterator.__next__(), self.VALID_SERVERS[1])
self.assertEqual(iterator.__next__(), self.VALID_SERVERS[2])
def test_merge_list(self):
"""Tests that ServerList can merge additional servers from a list
ServerList should be able to accept a list type object of IP address
strings and merge them into its current list of servers.
"""
list_to_merge = ['1.1.1.1', '2.2.2.2']
result = models.ServerList(self.VALID_SERVERS)
self.assertEqual(set(self.VALID_SERVERS), set(result.servers))
expected_result = [*self.VALID_SERVERS, *list_to_merge]
result.merge(list_to_merge)
self.assertEqual(set(expected_result), set(result.servers))
def test_merge_str(self):
"""Tests that ServerList can merge additional servers from a str
ServerList should be able to accept a comma separated list of IP
addresses as strings and merge them into its current list of servers.
"""
list_to_merge = '1.1.1.1,2.2.2.2'
result = models.ServerList(self.VALID_SERVERS)
self.assertEqual(set(self.VALID_SERVERS), set(result.servers))
expected_result = [*self.VALID_SERVERS, *(list_to_merge.split(','))]
result.merge(list_to_merge)
self.assertEqual(set(expected_result), set(result.servers))
class TestIPList(unittest.TestCase):
"""Tests for the IPList model"""
VALID_IP = {
'oob': '14.102.252.126',
'oam': '120.145.167.87',
'calico': '46.12.178.235',
'overlay': '226.208.39.49',
'pxe': '164.99.192.149',
'storage': '252.63.220.22'
}
INVALID_IP = {
'oob': '14.102.252.126',
'oam': 'not4nip4ddr3$$',
'calico': '46.12.178.235',
'overlay': '226.208.39.49',
'pxe': '164.99.192.149',
'storage': '252.63.220.22'
}
MISSING_IP = {
'oob': '14.102.252.126',
'calico': '46.12.178.235',
'overlay': '226.208.39.49',
'pxe': '164.99.192.149',
'storage': '252.63.220.22'
}
def test___init__(self):
"""Tests basic initialization of an IPList"""
result = models.IPList(**self.VALID_IP)
self.assertEqual(self.VALID_IP['oob'], result.oob)
self.assertEqual(self.VALID_IP['oam'], result.oam)
self.assertEqual(self.VALID_IP['calico'], result.calico)
self.assertEqual(self.VALID_IP['overlay'], result.overlay)
self.assertEqual(self.VALID_IP['pxe'], result.pxe)
self.assertEqual(self.VALID_IP['storage'], result.storage)
def test___init___invalid_ip(self):
"""Tests initialization of an IPList using invalid IPs
When invalid IP addresses are given to IPList, it should log a warning
to the user using _parse_ip(). IPList should still be created using
the invalid address.
"""
expected_message = \
'%s is not a valid IP address.' % self.INVALID_IP['oam']
with self.assertLogs(level='WARNING') as test_log:
result = models.IPList(**self.INVALID_IP)
self.assertEqual(len(test_log.output), 1)
self.assertEqual(len(test_log.records), 1)
self.assertIn(expected_message, test_log.output[0])
self.assertEqual(self.INVALID_IP['oob'], result.oob)
self.assertEqual(self.INVALID_IP['oam'], result.oam)
self.assertEqual(self.INVALID_IP['calico'], result.calico)
self.assertEqual(self.INVALID_IP['overlay'], result.overlay)
self.assertEqual(self.INVALID_IP['pxe'], result.pxe)
self.assertEqual(self.INVALID_IP['storage'], result.storage)
def test___init___missing_ip(self):
"""Tests initialization of an IPList with an entry missing
IPList should automatically fill in any missing entries with the value
set by models.DATA_DEFAULT.
"""
expected_message = \
'%s is not a valid IP address.' % models.DATA_DEFAULT
with self.assertLogs(level='WARNING') as test_log:
result = models.IPList(**self.MISSING_IP)
self.assertEqual(len(test_log.output), 1)
self.assertEqual(len(test_log.records), 1)
self.assertIn(expected_message, test_log.output[0])
self.assertEqual(self.MISSING_IP['oob'], result.oob)
self.assertEqual(models.DATA_DEFAULT, result.oam)
self.assertEqual(self.MISSING_IP['calico'], result.calico)
self.assertEqual(self.MISSING_IP['overlay'], result.overlay)
self.assertEqual(self.MISSING_IP['pxe'], result.pxe)
self.assertEqual(self.MISSING_IP['storage'], result.storage)
def test___iter__(self):
"""Tests iterator builtin for IPList
When iter() is called on an IPList, it should yield key-value pairs of
each role and its associated IP address.
"""
result = models.IPList(**self.VALID_IP)
for key, value in result.__iter__():
self.assertIn(key, self.VALID_IP)
self.assertEqual(self.VALID_IP[key], value)
def test_set_ip_by_role(self):
"""Tests setting a single IP by role"""
result = models.IPList(**self.VALID_IP)
new_calico_ip = '87.85.178.249'
result.set_ip_by_role('calico', new_calico_ip)
self.assertEqual(self.VALID_IP['oob'], result.oob)
self.assertEqual(self.VALID_IP['oam'], result.oam)
self.assertEqual(new_calico_ip, result.calico)
self.assertEqual(self.VALID_IP['overlay'], result.overlay)
self.assertEqual(self.VALID_IP['pxe'], result.pxe)
self.assertEqual(self.VALID_IP['storage'], result.storage)
def test_set_ip_by_role_invalid_role(self):
"""Tests setting an invalid role's IP
Attempting to set an invalid role should log a warning to the user and
do nothing to the IPList's data.
"""
result = models.IPList(**self.VALID_IP)
new_ip = '87.85.178.249'
role = 'DNE'
expected_message = '%s role is not defined for IPList.' % role
with self.assertLogs(level='WARNING') as test_log:
result.set_ip_by_role(role, new_ip)
self.assertEqual(len(test_log.output), 1)
self.assertEqual(len(test_log.records), 1)
self.assertIn(expected_message, test_log.output[0])
self.assertEqual(self.VALID_IP['oob'], result.oob)
self.assertEqual(self.VALID_IP['oam'], result.oam)
self.assertEqual(self.VALID_IP['calico'], result.calico)
self.assertEqual(self.VALID_IP['overlay'], result.overlay)
self.assertEqual(self.VALID_IP['pxe'], result.pxe)
self.assertEqual(self.VALID_IP['storage'], result.storage)
def test_dict_from_class(self):
"""Tests production of a dictionary from IPList"""
ip_list = models.IPList(**self.VALID_IP)
result = ip_list.dict_from_class()
self.assertDictEqual(self.VALID_IP, result)
def test_dict_from_class_missing_ip(self):
"""Tests production of a dictionary from IPList with a missing IP
If an IP address is not set for the IPList, it should not appear in the
dictionary output.
"""
missing_ip = copy(self.MISSING_IP)
missing_ip['oam'] = ''
ip_list = models.IPList(**missing_ip)
result = ip_list.dict_from_class()
self.assertDictEqual(self.MISSING_IP, result)
def test_merge_additional_data(self):
"""Tests merging of additional data dictionaries
Tests that merging of additional data will set a missing role's IP.
"""
config_dict = {'oam': '87.85.178.249'}
ip_list = models.IPList(**self.MISSING_IP)
ip_list.merge_additional_data(config_dict)
self.assertEqual(self.MISSING_IP['oob'], ip_list.oob)
self.assertEqual(config_dict['oam'], ip_list.oam)
self.assertEqual(self.MISSING_IP['calico'], ip_list.calico)
self.assertEqual(self.MISSING_IP['overlay'], ip_list.overlay)
self.assertEqual(self.MISSING_IP['pxe'], ip_list.pxe)
self.assertEqual(self.MISSING_IP['storage'], ip_list.storage)
class TestHost(unittest.TestCase):
"""Tests for the Host model"""
HOST_NAME = 'test_host1'
HOST_DATA = {
'rack_name': 'rack01',
'host_profile': 'host',
'type': 'compute'
}
@mock.patch('spyglass.data_extractor.models.IPList', autospec=True)
def setUp(self, MockIPList):
"""Initializes a mocked IPList"""
self.MockIPList = MockIPList
self.HOST_DATA['ip'] = MockIPList()
self.HOST_DATA['ip'].dict_from_class.return_value = 'success'
def test___init__(self):
"""Tests basic initialization of Host"""
result = models.Host(self.HOST_NAME, **self.HOST_DATA)
self.assertEqual(self.HOST_NAME, result.name)
self.assertEqual(self.HOST_DATA['rack_name'], result.rack_name)
self.assertEqual(self.HOST_DATA['host_profile'], result.host_profile)
self.assertEqual(self.HOST_DATA['type'], result.type)
self.assertEqual(self.HOST_DATA['ip'], result.ip)
def test___init___missing_data(self):
"""Tests initialization of Host with missing data
Unless an attribute is required, Host should automatically fill in the
value set by models.DATA_DEFAULT for each attribute. The only exception
is for the Host's ip attribute which will be a new instance of IPList.
"""
result = models.Host(self.HOST_NAME)
self.assertEqual(self.HOST_NAME, result.name)
self.assertEqual(models.DATA_DEFAULT, result.rack_name)
self.assertEqual(models.DATA_DEFAULT, result.host_profile)
self.assertEqual(models.DATA_DEFAULT, result.type)
self.assertIsInstance(result.ip, models.IPList)
def test_dict_from_class(self):
"""Tests production of a dictionary from a Host object"""
expected_result = {
self.HOST_NAME: {
'host_profile': self.HOST_DATA['host_profile'],
'ip': 'success',
'type': self.HOST_DATA['type']
}
}
host = models.Host(self.HOST_NAME, **self.HOST_DATA)
result = host.dict_from_class()
self.assertDictEqual(expected_result, result)
def test_merge_additional_data(self):
"""Tests merging of an additional data dictionary into a Host object"""
config_dict = copy(self.HOST_DATA)
config_dict['ip'] = 'success'
result = models.Host(self.HOST_NAME)
self.assertEqual(self.HOST_NAME, result.name)
self.assertEqual(models.DATA_DEFAULT, result.rack_name)
self.assertEqual(models.DATA_DEFAULT, result.host_profile)
self.assertEqual(models.DATA_DEFAULT, result.type)
self.assertIsInstance(result.ip, models.IPList)
result.merge_additional_data(config_dict)
self.assertEqual(self.HOST_NAME, result.name)
self.assertEqual(config_dict['rack_name'], result.rack_name)
self.assertEqual(config_dict['host_profile'], result.host_profile)
self.assertEqual(config_dict['type'], result.type)
self.assertEqual(config_dict['ip'], 'success')
class TestRack(unittest.TestCase):
"""Tests for the Rack model"""
RACK_NAME = 'test_rack1'
HOST_DATA = {
'rack_name': RACK_NAME,
'host_profile': 'host',
'type': 'compute'
}
@mock.patch('spyglass.data_extractor.models.IPList', autospec=True)
def setUp(self, MockIPList):
"""Sets up a mocked IPList and a list of Host objects for testing"""
self.MockIPList = MockIPList
self.HOST_DATA['ip'] = MockIPList()
self.HOST_DATA['ip'].dict_from_class.return_value = 'success'
self.hosts = [
models.Host('test_host1', **self.HOST_DATA),
models.Host('test_host2', **self.HOST_DATA),
models.Host('test_host3', **self.HOST_DATA),
]
def test___init__(self):
"""Tests basic initialization of Rack"""
result = models.Rack(self.RACK_NAME, self.hosts)
self.assertEqual(self.RACK_NAME, result.name)
self.assertEqual(self.hosts, result.hosts)
def test_dict_from_class(self):
"""Tests production of a dictionary from a Rack object"""
expected_result = {self.RACK_NAME: {}}
expected_result[self.RACK_NAME].update(self.hosts[0].dict_from_class())
expected_result[self.RACK_NAME].update(self.hosts[1].dict_from_class())
expected_result[self.RACK_NAME].update(self.hosts[2].dict_from_class())
result = models.Rack(self.RACK_NAME, self.hosts)
self.assertDictEqual(expected_result, result.dict_from_class())
def test_merge_additional_data_new_host(self):
"""Tests merging of data containing a new host
If the additional data dictionary contains a host not already contained
in Rack.hosts, Rack should add the host to the list.
"""
config_dict = {'test_host4': {**self.HOST_DATA}}
result = models.Rack(self.RACK_NAME, self.hosts)
self.assertIsNone(result.get_host_by_name('test_host4'))
result.merge_additional_data(config_dict)
self.assertIsNotNone(result.get_host_by_name('test_host4'))
def test_merge_additional_data_existing_host(self):
"""Tests merging of data containing data for an existing host
If the additional data dictionary contains a host already contained in
Rack.hosts, Rack should call merge_additional_data on the existing host
with the new data.
"""
config_dict = {'test_host1': {'host_profile': 'new_profile'}}
result = models.Rack(self.RACK_NAME, self.hosts)
self.assertEqual(
self.HOST_DATA['host_profile'],
result.get_host_by_name('test_host1').host_profile)
result.merge_additional_data(config_dict)
self.assertEqual(
config_dict['test_host1']['host_profile'],
result.get_host_by_name('test_host1').host_profile)
def test_get_host_by_name(self):
"""Tests retrieval of a Rack's host by name"""
result = models.Rack(self.RACK_NAME, self.hosts)
self.assertEqual(
self.hosts[1], result.get_host_by_name(self.hosts[1].name))
class TestVLANNetworkData(unittest.TestCase):
"""Tests for the VLANNetworkData model"""
VLAN_NAME = 'test'
VLAN_DATA = {
'role': 'oam',
'vlan': '23',
'subnet': ['210.27.143.213', '127.13.31.192'],
'routes': ['29.190.93.106', '252.240.25.174'],
'gateway': '204.70.95.80',
'dhcp_start': '88.9.225.29',
'dhcp_end': '71.31.147.105',
'static_start': '117.137.102.246',
'static_end': '176.20.227.186',
'reserved_start': '229.171.15.171',
'reserved_end': '230.187.248.100'
}
def test___init__(self):
"""Tests basic initialization of VLANNetworkData"""
result = models.VLANNetworkData(self.VLAN_NAME, **self.VLAN_DATA)
self.assertEqual(self.VLAN_NAME, result.name)
self.assertEqual(self.VLAN_DATA['role'], result.role)
self.assertEqual(self.VLAN_DATA['vlan'], result.vlan)
self.assertEqual(self.VLAN_DATA['subnet'], result.subnet)
self.assertEqual(self.VLAN_DATA['routes'], result.routes)
self.assertEqual(self.VLAN_DATA['gateway'], result.gateway)
self.assertEqual(self.VLAN_DATA['dhcp_start'], result.dhcp_start)
self.assertEqual(self.VLAN_DATA['dhcp_end'], result.dhcp_end)
self.assertEqual(self.VLAN_DATA['static_start'], result.static_start)
self.assertEqual(self.VLAN_DATA['static_end'], result.static_end)
self.assertEqual(
self.VLAN_DATA['reserved_start'], result.reserved_start)
self.assertEqual(self.VLAN_DATA['reserved_end'], result.reserved_end)
def test___init___missing_data(self):
"""Tests initialization of VLANNetworkData with missing data
Any data not explicitly given to VLANNetworkData should be set to None
or an empty list. Since VLANNetworkData can contain a variety of
different settings, many of which are not required, most of these
settings default to None so they will not be outputted when exporting
to a dictionary.
"""
result = models.VLANNetworkData(self.VLAN_NAME)
self.assertEqual(self.VLAN_NAME, result.name)
self.assertEqual(self.VLAN_NAME, result.role)
self.assertIsNone(result.vlan)
self.assertEqual([], result.subnet)
self.assertEqual([], result.routes)
self.assertIsNone(result.dhcp_start)
self.assertIsNone(result.dhcp_end)
self.assertIsNone(result.static_start)
self.assertIsNone(result.static_end)
self.assertIsNone(result.reserved_start)
self.assertIsNone(result.reserved_end)
def test_dict_from_class(self):
"""Tests production of a dictionary from a VLANNetworkData object"""
copy_vlan_data = copy(self.VLAN_DATA)
copy_vlan_data.pop('role')
expected_result = {self.VLAN_DATA['role']: {**copy_vlan_data}}
result = models.VLANNetworkData(self.VLAN_NAME, **self.VLAN_DATA)
self.assertDictEqual(expected_result, result.dict_from_class())
def test_merge_additional_data(self):
"""Tests merging of additional data into a VLANNetworkData object"""
result = models.VLANNetworkData(self.VLAN_NAME)
self.assertEqual(self.VLAN_NAME, result.name)
self.assertEqual(self.VLAN_NAME, result.role)
self.assertIsNone(result.vlan)
self.assertEqual([], result.subnet)
self.assertEqual([], result.routes)
self.assertIsNone(result.dhcp_start)
self.assertIsNone(result.dhcp_end)
self.assertIsNone(result.static_start)
self.assertIsNone(result.static_end)
self.assertIsNone(result.reserved_start)
self.assertIsNone(result.reserved_end)
result.merge_additional_data(self.VLAN_DATA)
self.assertEqual(self.VLAN_NAME, result.name)
self.assertEqual(self.VLAN_DATA['role'], result.role)
self.assertEqual(self.VLAN_DATA['vlan'], result.vlan)
self.assertEqual(self.VLAN_DATA['subnet'], result.subnet)
self.assertEqual(self.VLAN_DATA['routes'], result.routes)
self.assertEqual(self.VLAN_DATA['gateway'], result.gateway)
self.assertEqual(self.VLAN_DATA['dhcp_start'], result.dhcp_start)
self.assertEqual(self.VLAN_DATA['dhcp_end'], result.dhcp_end)
self.assertEqual(self.VLAN_DATA['static_start'], result.static_start)
self.assertEqual(self.VLAN_DATA['static_end'], result.static_end)
self.assertEqual(
self.VLAN_DATA['reserved_start'], result.reserved_start)
self.assertEqual(self.VLAN_DATA['reserved_end'], result.reserved_end)
class TestNetwork(unittest.TestCase):
"""Tests for the Network model"""
VLAN_DATA = {
'vlan': '23',
'subnet': ['210.27.143.213', '127.13.31.192'],
'routes': ['29.190.93.106', '252.240.25.174'],
'gateway': '204.70.95.80',
'dhcp_start': '88.9.225.29',
'dhcp_end': '71.31.147.105',
'static_start': '117.137.102.246',
'static_end': '176.20.227.186',
'reserved_start': '229.171.15.171',
'reserved_end': '230.187.248.100'
}
BGP_DATA = {
'asnumber': 64671,
'ingress_vip': '10.0.220.73',
'peer_asnumber': 64688
}
def setUp(self):
"""Sets up a list of VLANNetworkData used to test Network objects"""
self.vlan_network_data = [
models.VLANNetworkData('oam', **self.VLAN_DATA),
models.VLANNetworkData('oob', **self.VLAN_DATA),
models.VLANNetworkData('pxe', **self.VLAN_DATA)
]
def test___init__(self):
"""Tests basic initialization of a Network object"""
result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA)
self.assertEqual(
set(self.vlan_network_data), set(result.vlan_network_data))
self.assertDictEqual(self.BGP_DATA, result.bgp)
def test_dict_from_class(self):
"""Tests production of a dictionary from a Network object"""
expected_result = {
'bgp': self.BGP_DATA,
'vlan_network_data': {
'oam': {
**self.VLAN_DATA
},
'oob': {
**self.VLAN_DATA
},
'pxe': {
**self.VLAN_DATA
},
}
}
result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA)
self.assertDictEqual(expected_result, result.dict_from_class())
def test_merge_additional_data_bgp(self):
"""Tests merging additional BGP data
BGP data is often set after the initial generation of data objects.
"""
result = models.Network(self.vlan_network_data)
self.assertEqual({}, result.bgp)
result.merge_additional_data({'bgp': self.BGP_DATA})
self.assertEqual(self.BGP_DATA, result.bgp)
def test_merge_additional_data_new_vlan_data(self):
"""Tests merging of data containing data for a new VLANNetworkData obj
If a new set of VLANNetworkData is introduced in additional data,
Network should create a new VLANNetworkData object for its list.
"""
result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA)
self.assertIsNone(result.get_vlan_data_by_name('calico'))
new_calico_vlan = {'vlan_network_data': {'calico': {**self.VLAN_DATA}}}
result.merge_additional_data(new_calico_vlan)
self.assertIsNotNone(result.get_vlan_data_by_name('calico'))
def test_merge_additional_data_new_data_for_existing_vlan(self):
"""Tests merging of data for an existing VLANNetworkData object
If a new set of data for an existing VLANNetworkData role is given by
additional data, Network should merge this new data into the existing
VLANNetworkData object.
"""
result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA)
self.assertEqual(
self.VLAN_DATA['vlan'],
result.get_vlan_data_by_name('oob').vlan)
new_vlan_data = {'vlan_network_data': {'oob': {'vlan': '12'}}}
result.merge_additional_data(new_vlan_data)
self.assertEqual(
new_vlan_data['vlan_network_data']['oob']['vlan'],
result.get_vlan_data_by_name('oob').vlan)
def test_get_vlan_data_by_name(self):
"""Tests retrieval of VLANNetworkData by name attribute"""
result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA)
self.assertEqual(
self.vlan_network_data[1], result.get_vlan_data_by_name('oob'))
def test_get_vlan_data_by_name_dne(self):
"""Tests retrieval of nonexistent name VLANNetworkData"""
result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA)
self.assertIsNone(result.get_vlan_data_by_name('calico'))
def test_get_vlan_data_by_role(self):
"""Tests retrieval of VLANNetworkData by role attribute"""
result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA)
self.assertEqual(
self.vlan_network_data[1], result.get_vlan_data_by_role('oob'))
def test_get_vlan_data_by_role_dne(self):
"""Tests retrieval of nonexistent role VLANNetworkData"""
result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA)
self.assertIsNone(result.get_vlan_data_by_role('calico'))
class TestSiteInfo(unittest.TestCase):
"""Tests for the SiteInfo model"""
SITE_NAME = 'Test Site'
SITE_INFO = {
'physical_location_id': 12345,
'state': 'MO',
'country': 'USA',
'corridor': 'C1',
'sitetype': 'test',
'dns': ['210.27.143.213', '127.13.31.192'],
'ntp': ['29.190.93.106', '252.240.25.174'],
'domain': 'example.com',
'ldap': {
'common_name': 'test',
'domain': 'example',
'subdomain': 'test',
'url': 'ldap://ldap.example.com'
}
}
def test___init__(self):
"""Tests basic initialization of SiteInfo"""
result = models.SiteInfo(self.SITE_NAME, **self.SITE_INFO)
self.assertEqual(self.SITE_NAME, result.name)
self.assertEqual(
self.SITE_INFO['physical_location_id'],
result.physical_location_id)
self.assertEqual(self.SITE_INFO['state'], result.state)
self.assertEqual(self.SITE_INFO['country'], result.country)
self.assertEqual(self.SITE_INFO['corridor'], result.corridor)
self.assertEqual(self.SITE_INFO['sitetype'], result.sitetype)
self.assertEqual(','.join(self.SITE_INFO['dns']), str(result.dns))
self.assertEqual(','.join(self.SITE_INFO['ntp']), str(result.ntp))
self.assertEqual(self.SITE_INFO['domain'], result.domain)
self.assertDictEqual(self.SITE_INFO['ldap'], result.ldap)
def test___init___missing_data(self):
"""Tests initailization of SiteInfo with missing data
If data is not given for SiteInfo attributes, the attributes should
be set to the value given by models.DATA_DEFAULT, an empty list, or
an empty dictionary.
"""
result = models.SiteInfo(self.SITE_NAME)
self.assertEqual(self.SITE_NAME, result.name)
self.assertEqual(models.DATA_DEFAULT, result.physical_location_id)
self.assertEqual(models.DATA_DEFAULT, result.state)
self.assertEqual(models.DATA_DEFAULT, result.country)
self.assertEqual(models.DATA_DEFAULT, result.corridor)
self.assertEqual(models.DATA_DEFAULT, result.sitetype)
self.assertEqual(','.join([]), str(result.dns))
self.assertEqual(','.join([]), str(result.ntp))
self.assertEqual(models.DATA_DEFAULT, result.domain)
self.assertDictEqual({}, result.ldap)
def test_dict_from_class(self):
"""Tests production of a dictionary from a SiteInfo object"""
expected_results = copy(self.SITE_INFO)
expected_results['dns'] = ','.join(expected_results['dns'])
expected_results['ntp'] = ','.join(expected_results['ntp'])
expected_results['name'] = self.SITE_NAME
result = models.SiteInfo(self.SITE_NAME, **self.SITE_INFO)
self.assertDictEqual(expected_results, result.dict_from_class())
def test_merge_additional_data(self):
"""Tests merging of additional data into SiteInfo"""
result = models.SiteInfo(self.SITE_NAME)
self.assertEqual(self.SITE_NAME, result.name)
self.assertEqual(models.DATA_DEFAULT, result.physical_location_id)
self.assertEqual(models.DATA_DEFAULT, result.state)
self.assertEqual(models.DATA_DEFAULT, result.country)
self.assertEqual(models.DATA_DEFAULT, result.corridor)
self.assertEqual(models.DATA_DEFAULT, result.sitetype)
self.assertEqual(','.join([]), str(result.dns))
self.assertEqual(','.join([]), str(result.ntp))
self.assertEqual(models.DATA_DEFAULT, result.domain)
self.assertDictEqual({}, result.ldap)
config_dict = copy(self.SITE_INFO)
config_dict['dns'] = {'servers': config_dict['dns']}
config_dict['ntp'] = {'servers': config_dict['ntp']}
config_dict['name'] = 'new_name'
result.merge_additional_data(config_dict)
self.assertEqual(config_dict['name'], result.name)
self.assertEqual(
self.SITE_INFO['physical_location_id'],
result.physical_location_id)
self.assertEqual(self.SITE_INFO['state'], result.state)
self.assertEqual(self.SITE_INFO['country'], result.country)
self.assertEqual(self.SITE_INFO['corridor'], result.corridor)
self.assertEqual(self.SITE_INFO['sitetype'], result.sitetype)
self.assertEqual(','.join(self.SITE_INFO['dns']), str(result.dns))
self.assertEqual(','.join(self.SITE_INFO['ntp']), str(result.ntp))
self.assertEqual(self.SITE_INFO['domain'], result.domain)
self.assertDictEqual(self.SITE_INFO['ldap'], result.ldap)
class TestSiteDocumentData(unittest.TestCase):
"""Tests for the SiteDocumentData model"""
STORAGE_DICT = {'ceph': {'controller': {'osd_count': 6}}}
@mock.patch('spyglass.data_extractor.models.SiteInfo')
@mock.patch('spyglass.data_extractor.models.Network')
@mock.patch('spyglass.data_extractor.models.Rack')
def test___init__(self, Rack, Network, SiteInfo):
"""Tests basic initialization of SiteDocumentData"""
site_info = SiteInfo()
network = Network()
baremetal = [Rack(), Rack(), Rack()]
result = models.SiteDocumentData(
site_info, network, baremetal, self.STORAGE_DICT)
self.assertEqual(site_info, result.site_info)
self.assertEqual(network, result.network)
self.assertEqual(set(baremetal), set(result.baremetal))
self.assertDictEqual(self.STORAGE_DICT, result.storage)
@mock.patch('spyglass.data_extractor.models.SiteInfo')
@mock.patch('spyglass.data_extractor.models.Network')
@mock.patch('spyglass.data_extractor.models.Rack')
def test_dict_from_class(self, Rack, Network, SiteInfo):
"""Tests production of a dictionary from a SiteDocumentData object"""
mock_site_info_data = {'name': 'test', 'country': 'USA'}
mock_network_data = {
'bgp': 'bgp_data',
'vlan_network_data': 'vlan_data'
}
mock_baremetal0_data = {'rack1': {'host1': 'data'}}
mock_baremetal1_data = {'rack2': {'host2': 'data'}}
site_info = SiteInfo()
site_info.dict_from_class.return_value = mock_site_info_data
network = Network()
network.dict_from_class.return_value = mock_network_data
baremetal = [Rack(), Rack()]
Rack().dict_from_class.side_effect = \
[mock_baremetal0_data, mock_baremetal1_data]
expected_result = {
'baremetal': {
**mock_baremetal0_data,
**mock_baremetal1_data
},
'network': mock_network_data,
'site_info': mock_site_info_data,
'storage': self.STORAGE_DICT
}
result = models.SiteDocumentData(
site_info, network, baremetal, self.STORAGE_DICT)
self.assertDictEqual(expected_result, result.dict_from_class())
@mock.patch('spyglass.data_extractor.models.SiteInfo')
@mock.patch('spyglass.data_extractor.models.Network')
@mock.patch('spyglass.data_extractor.models.Rack')
def test_merge_additional_data_storage(self, Rack, Network, SiteInfo):
"""Tests merging of storage data dictionary
Storage data is often given after initialization of data objects.
"""
site_info = SiteInfo()
network = Network()
baremetal = [Rack(), Rack(), Rack()]
result = models.SiteDocumentData(site_info, network, baremetal)
self.assertIsNone(result.storage)
result.merge_additional_data({'storage': self.STORAGE_DICT})
self.assertDictEqual(self.STORAGE_DICT, result.storage)
@mock.patch('spyglass.data_extractor.models.SiteInfo')
@mock.patch('spyglass.data_extractor.models.Network')
@mock.patch('spyglass.data_extractor.models.Rack')
def test_get_baremetal_rack_by_name(self, Rack, Network, SiteInfo):
"""Tests retrieval of baremetal rack by name"""
site_info = SiteInfo()
network = Network()
baremetal = [Rack(), Rack(), Rack()]
type(Rack()).name = mock.PropertyMock(
side_effect=['rack1', 'rack2', 'rack3'])
result = models.SiteDocumentData(site_info, network, baremetal)
self.assertIsNotNone(result.get_baremetal_rack_by_name('rack2'))
@mock.patch('spyglass.data_extractor.models.SiteInfo')
@mock.patch('spyglass.data_extractor.models.Network')
@mock.patch('spyglass.data_extractor.models.Rack')
def test_get_baremetal_rack_by_name_dne(self, Rack, Network, SiteInfo):
"""Tests retrieval of nonexistent baremetal rack by name"""
site_info = SiteInfo()
network = Network()
baremetal = [Rack(), Rack()]
type(Rack()).name = mock.PropertyMock(side_effect=['rack1', 'rack3'])
result = models.SiteDocumentData(site_info, network, baremetal)
self.assertIsNone(result.get_baremetal_rack_by_name('rack2'))

View File

@ -72,6 +72,6 @@ deps =
commands =
bash -c 'PATH=$PATH:~/.local/bin; pytest --cov=spyglass --cov-report \
html:cover --cov-report xml:cover/coverage.xml --cov-report term \
--cov-fail-under 50 tests/'
--cov-fail-under 60 tests/'
whitelist_externals =
bash