From 313058b8fbcb1c299c6952b243d8cd9da8338a49 Mon Sep 17 00:00:00 2001 From: "Ian H. Pittwood" Date: Tue, 18 Jun 2019 15:15:28 -0500 Subject: [PATCH] Adds tests for Spyglass data objects Adds unit tests for all data objects created in [0]. The changes in [0] were merged in short succession with [1], causing the test coverage gate to fail with all the newly introduced lines of code. This change adds tests for all of the newly added code and increases test coverage requirement to 60% (currently at 65.93%). [0] https://review.opendev.org/#/c/658917/ [1] https://review.opendev.org/#/c/663729/ Change-Id: I96931e3e415af80ca5ab9202c2bda0344a9901f0 --- spyglass/data_extractor/models.py | 39 +- tests/unit/data_extractor/test_models.py | 821 +++++++++++++++++++++++ tox.ini | 2 +- 3 files changed, 847 insertions(+), 15 deletions(-) create mode 100644 tests/unit/data_extractor/test_models.py diff --git a/spyglass/data_extractor/models.py b/spyglass/data_extractor/models.py index df8cc24..aabdc4e 100644 --- a/spyglass/data_extractor/models.py +++ b/spyglass/data_extractor/models.py @@ -57,7 +57,7 @@ class ServerList(object): def __iter__(self): yield from self.servers - def merge(self, server_list: str): + def merge(self, server_list): """Merges a comma separated server list into the object This method is used to merge additional servers into the list. This is @@ -106,7 +106,7 @@ class IPList(object): 'overlay': self.overlay, 'pxe': self.pxe, 'storage': self.storage - } + }.items() def set_ip_by_role(self, role: str, new_value): if role == 'oob': @@ -187,12 +187,14 @@ class Host(object): return { self.name: { 'host_profile': self.host_profile, - 'ip': self.ip, + 'ip': self.ip.dict_from_class(), 'type': self.type } } def merge_additional_data(self, config_dict: dict): + if 'rack_name' in config_dict: + self.rack_name = config_dict['rack_name'] if 'type' in config_dict: self.type = config_dict['type'] if 'host_profile' in config_dict: @@ -218,7 +220,7 @@ class Rack(object): """Creates a writeable dict structure from the object""" rack_as_dict = {self.name: {}} for host in self.hosts: - rack_as_dict.update(host.dict_from_class()) + rack_as_dict[self.name].update(host.dict_from_class()) return rack_as_dict def merge_additional_data(self, config_dict: dict): @@ -257,9 +259,16 @@ class VLANNetworkData(object): :Keyword Arguments: * *role* (``str``) - Role of the data entry, defaults to name - * *vlan* (``str``, ``int``) - - * *type* (``str``) - Host type - * *ip* (``IPList``) - List of IP addresses for baremetal host + * *vlan* (``str``, ``int``) - virtual LAN ID number as str or int + * *subnet* (``list``) - list of subnet IP addresses as strings + * *routes* (``list``) - list of routes IP addresses as strings + * *gateway* - gateway address + * *dhcp_start* - DHCP range start + * *dhcp_end* - DHCP range end + * *static_start* - static IP range start + * *static_end* - static IP range end + * *reserved_start* - reserved IP range start + * *reserved_end* - reserved IP range end """ self.name = name self.role = kwargs.get('role', self.name) @@ -307,6 +316,8 @@ class VLANNetworkData(object): return vlan_dict def merge_additional_data(self, config_dict: dict): + if 'role' in config_dict: + self.role = config_dict['role'] if 'vlan' in config_dict: self.vlan = config_dict['vlan'] if 'subnet' in config_dict: @@ -320,15 +331,15 @@ class VLANNetworkData(object): if 'dhcp_start' in config_dict: self.dhcp_start = config_dict['dhcp_start'] if 'dhcp_end' in config_dict: - self.dhcp_start = config_dict['dhcp_end'] + self.dhcp_end = config_dict['dhcp_end'] if 'static_start' in config_dict: - self.dhcp_start = config_dict['static_start'] + self.static_start = config_dict['static_start'] if 'static_end' in config_dict: - self.dhcp_start = config_dict['static_end'] + self.static_end = config_dict['static_end'] if 'reserved_start' in config_dict: - self.dhcp_start = config_dict['reserved_start'] + self.reserved_start = config_dict['reserved_start'] if 'reserved_end' in config_dict: - self.dhcp_start = config_dict['reserved_end'] + self.reserved_end = config_dict['reserved_end'] class Network(object): @@ -445,11 +456,11 @@ class SiteInfo(object): return { 'corridor': self.corridor, 'country': self.country, - 'dns': self.dns, + 'dns': str(self.dns), 'domain': self.domain, 'ldap': self.ldap, 'name': self.name, - 'ntp': self.ntp, + 'ntp': str(self.ntp), 'physical_location_id': self.physical_location_id, 'sitetype': self.sitetype, 'state': self.state, diff --git a/tests/unit/data_extractor/test_models.py b/tests/unit/data_extractor/test_models.py new file mode 100644 index 0000000..71a380b --- /dev/null +++ b/tests/unit/data_extractor/test_models.py @@ -0,0 +1,821 @@ +# Copyright 2019 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from copy import copy +import unittest +from unittest import mock + +from spyglass.data_extractor import models + + +class TestParseIp(unittest.TestCase): + """Tests the _parse_ip validator for Spyglass models""" + + def test__parse_ip(self): + """Tests basic function of _parse_ip validator""" + addr = '10.23.0.1' + result = models._parse_ip(addr) + self.assertEqual(addr, result) + + def test__parse_ip_bad_ip(self): + """Tests that invalid IP addresses are logged as a warning and returned + + without changes + """ + addr = 'not4nip4ddr3$$' + expected_message = '%s is not a valid IP address.' % addr + with self.assertLogs(level='WARNING') as test_log: + result = models._parse_ip(addr) + self.assertEqual(len(test_log.output), 1) + self.assertEqual(len(test_log.records), 1) + self.assertIn(expected_message, test_log.output[0]) + self.assertEqual(addr, result) + + +class TestServerList(unittest.TestCase): + """Tests for the ServerList model""" + + VALID_SERVERS = ['121.12.13.1', '193.153.1.1', '12.23.9.11'] + INVALID_SERVERS = ['not4nip4ddr3$$', '124.34.1.1', 'ALSONOTVALID'] + + def test___init__(self): + """Tests basic initialization of ServerList""" + result = models.ServerList(self.VALID_SERVERS) + self.assertEqual(set(self.VALID_SERVERS), set(result.servers)) + + def test___init___invalid_servers(self): + """Tests initialization of ServerList with bad IPs + + If ServerList is given IP addresses that appear invalid, it should log + warnings for the user. ServerList should still initialize regardless + of the IPs it is given. + """ + expected_messages = [ + '%s is not a valid IP address.' % self.INVALID_SERVERS[0], + '%s is not a valid IP address.' % self.INVALID_SERVERS[2] + ] + with self.assertLogs(level='WARNING') as test_log: + result = models.ServerList(self.INVALID_SERVERS) + self.assertEqual(len(test_log.output), 2) + self.assertEqual(len(test_log.records), 2) + self.assertIn(expected_messages[0], test_log.output[0]) + self.assertIn(expected_messages[1], test_log.output[1]) + self.assertEqual(set(self.INVALID_SERVERS), set(result.servers)) + + def test___str__(self): + """Tests str type casting for ServerList""" + expected_result = ','.join(self.VALID_SERVERS) + result = models.ServerList(self.VALID_SERVERS) + self.assertEqual(expected_result, str(result)) + + def test___iter__(self): + """Tests iterator builtin for ServerList""" + result = models.ServerList(self.VALID_SERVERS) + iterator = iter(result) + self.assertEqual(iterator.__next__(), self.VALID_SERVERS[0]) + self.assertEqual(iterator.__next__(), self.VALID_SERVERS[1]) + self.assertEqual(iterator.__next__(), self.VALID_SERVERS[2]) + + def test_merge_list(self): + """Tests that ServerList can merge additional servers from a list + + ServerList should be able to accept a list type object of IP address + strings and merge them into its current list of servers. + """ + list_to_merge = ['1.1.1.1', '2.2.2.2'] + result = models.ServerList(self.VALID_SERVERS) + self.assertEqual(set(self.VALID_SERVERS), set(result.servers)) + expected_result = [*self.VALID_SERVERS, *list_to_merge] + result.merge(list_to_merge) + self.assertEqual(set(expected_result), set(result.servers)) + + def test_merge_str(self): + """Tests that ServerList can merge additional servers from a str + + ServerList should be able to accept a comma separated list of IP + addresses as strings and merge them into its current list of servers. + """ + list_to_merge = '1.1.1.1,2.2.2.2' + result = models.ServerList(self.VALID_SERVERS) + self.assertEqual(set(self.VALID_SERVERS), set(result.servers)) + expected_result = [*self.VALID_SERVERS, *(list_to_merge.split(','))] + result.merge(list_to_merge) + self.assertEqual(set(expected_result), set(result.servers)) + + +class TestIPList(unittest.TestCase): + """Tests for the IPList model""" + + VALID_IP = { + 'oob': '14.102.252.126', + 'oam': '120.145.167.87', + 'calico': '46.12.178.235', + 'overlay': '226.208.39.49', + 'pxe': '164.99.192.149', + 'storage': '252.63.220.22' + } + INVALID_IP = { + 'oob': '14.102.252.126', + 'oam': 'not4nip4ddr3$$', + 'calico': '46.12.178.235', + 'overlay': '226.208.39.49', + 'pxe': '164.99.192.149', + 'storage': '252.63.220.22' + } + MISSING_IP = { + 'oob': '14.102.252.126', + 'calico': '46.12.178.235', + 'overlay': '226.208.39.49', + 'pxe': '164.99.192.149', + 'storage': '252.63.220.22' + } + + def test___init__(self): + """Tests basic initialization of an IPList""" + result = models.IPList(**self.VALID_IP) + self.assertEqual(self.VALID_IP['oob'], result.oob) + self.assertEqual(self.VALID_IP['oam'], result.oam) + self.assertEqual(self.VALID_IP['calico'], result.calico) + self.assertEqual(self.VALID_IP['overlay'], result.overlay) + self.assertEqual(self.VALID_IP['pxe'], result.pxe) + self.assertEqual(self.VALID_IP['storage'], result.storage) + + def test___init___invalid_ip(self): + """Tests initialization of an IPList using invalid IPs + + When invalid IP addresses are given to IPList, it should log a warning + to the user using _parse_ip(). IPList should still be created using + the invalid address. + """ + expected_message = \ + '%s is not a valid IP address.' % self.INVALID_IP['oam'] + with self.assertLogs(level='WARNING') as test_log: + result = models.IPList(**self.INVALID_IP) + self.assertEqual(len(test_log.output), 1) + self.assertEqual(len(test_log.records), 1) + self.assertIn(expected_message, test_log.output[0]) + self.assertEqual(self.INVALID_IP['oob'], result.oob) + self.assertEqual(self.INVALID_IP['oam'], result.oam) + self.assertEqual(self.INVALID_IP['calico'], result.calico) + self.assertEqual(self.INVALID_IP['overlay'], result.overlay) + self.assertEqual(self.INVALID_IP['pxe'], result.pxe) + self.assertEqual(self.INVALID_IP['storage'], result.storage) + + def test___init___missing_ip(self): + """Tests initialization of an IPList with an entry missing + + IPList should automatically fill in any missing entries with the value + set by models.DATA_DEFAULT. + """ + expected_message = \ + '%s is not a valid IP address.' % models.DATA_DEFAULT + with self.assertLogs(level='WARNING') as test_log: + result = models.IPList(**self.MISSING_IP) + self.assertEqual(len(test_log.output), 1) + self.assertEqual(len(test_log.records), 1) + self.assertIn(expected_message, test_log.output[0]) + self.assertEqual(self.MISSING_IP['oob'], result.oob) + self.assertEqual(models.DATA_DEFAULT, result.oam) + self.assertEqual(self.MISSING_IP['calico'], result.calico) + self.assertEqual(self.MISSING_IP['overlay'], result.overlay) + self.assertEqual(self.MISSING_IP['pxe'], result.pxe) + self.assertEqual(self.MISSING_IP['storage'], result.storage) + + def test___iter__(self): + """Tests iterator builtin for IPList + + When iter() is called on an IPList, it should yield key-value pairs of + each role and its associated IP address. + """ + result = models.IPList(**self.VALID_IP) + for key, value in result.__iter__(): + self.assertIn(key, self.VALID_IP) + self.assertEqual(self.VALID_IP[key], value) + + def test_set_ip_by_role(self): + """Tests setting a single IP by role""" + result = models.IPList(**self.VALID_IP) + new_calico_ip = '87.85.178.249' + result.set_ip_by_role('calico', new_calico_ip) + self.assertEqual(self.VALID_IP['oob'], result.oob) + self.assertEqual(self.VALID_IP['oam'], result.oam) + self.assertEqual(new_calico_ip, result.calico) + self.assertEqual(self.VALID_IP['overlay'], result.overlay) + self.assertEqual(self.VALID_IP['pxe'], result.pxe) + self.assertEqual(self.VALID_IP['storage'], result.storage) + + def test_set_ip_by_role_invalid_role(self): + """Tests setting an invalid role's IP + + Attempting to set an invalid role should log a warning to the user and + do nothing to the IPList's data. + """ + result = models.IPList(**self.VALID_IP) + new_ip = '87.85.178.249' + role = 'DNE' + expected_message = '%s role is not defined for IPList.' % role + with self.assertLogs(level='WARNING') as test_log: + result.set_ip_by_role(role, new_ip) + self.assertEqual(len(test_log.output), 1) + self.assertEqual(len(test_log.records), 1) + self.assertIn(expected_message, test_log.output[0]) + self.assertEqual(self.VALID_IP['oob'], result.oob) + self.assertEqual(self.VALID_IP['oam'], result.oam) + self.assertEqual(self.VALID_IP['calico'], result.calico) + self.assertEqual(self.VALID_IP['overlay'], result.overlay) + self.assertEqual(self.VALID_IP['pxe'], result.pxe) + self.assertEqual(self.VALID_IP['storage'], result.storage) + + def test_dict_from_class(self): + """Tests production of a dictionary from IPList""" + ip_list = models.IPList(**self.VALID_IP) + result = ip_list.dict_from_class() + self.assertDictEqual(self.VALID_IP, result) + + def test_dict_from_class_missing_ip(self): + """Tests production of a dictionary from IPList with a missing IP + + If an IP address is not set for the IPList, it should not appear in the + dictionary output. + """ + missing_ip = copy(self.MISSING_IP) + missing_ip['oam'] = '' + ip_list = models.IPList(**missing_ip) + result = ip_list.dict_from_class() + self.assertDictEqual(self.MISSING_IP, result) + + def test_merge_additional_data(self): + """Tests merging of additional data dictionaries + + Tests that merging of additional data will set a missing role's IP. + """ + config_dict = {'oam': '87.85.178.249'} + ip_list = models.IPList(**self.MISSING_IP) + ip_list.merge_additional_data(config_dict) + self.assertEqual(self.MISSING_IP['oob'], ip_list.oob) + self.assertEqual(config_dict['oam'], ip_list.oam) + self.assertEqual(self.MISSING_IP['calico'], ip_list.calico) + self.assertEqual(self.MISSING_IP['overlay'], ip_list.overlay) + self.assertEqual(self.MISSING_IP['pxe'], ip_list.pxe) + self.assertEqual(self.MISSING_IP['storage'], ip_list.storage) + + +class TestHost(unittest.TestCase): + """Tests for the Host model""" + + HOST_NAME = 'test_host1' + HOST_DATA = { + 'rack_name': 'rack01', + 'host_profile': 'host', + 'type': 'compute' + } + + @mock.patch('spyglass.data_extractor.models.IPList', autospec=True) + def setUp(self, MockIPList): + """Initializes a mocked IPList""" + self.MockIPList = MockIPList + self.HOST_DATA['ip'] = MockIPList() + self.HOST_DATA['ip'].dict_from_class.return_value = 'success' + + def test___init__(self): + """Tests basic initialization of Host""" + result = models.Host(self.HOST_NAME, **self.HOST_DATA) + self.assertEqual(self.HOST_NAME, result.name) + self.assertEqual(self.HOST_DATA['rack_name'], result.rack_name) + self.assertEqual(self.HOST_DATA['host_profile'], result.host_profile) + self.assertEqual(self.HOST_DATA['type'], result.type) + self.assertEqual(self.HOST_DATA['ip'], result.ip) + + def test___init___missing_data(self): + """Tests initialization of Host with missing data + + Unless an attribute is required, Host should automatically fill in the + value set by models.DATA_DEFAULT for each attribute. The only exception + is for the Host's ip attribute which will be a new instance of IPList. + """ + result = models.Host(self.HOST_NAME) + self.assertEqual(self.HOST_NAME, result.name) + self.assertEqual(models.DATA_DEFAULT, result.rack_name) + self.assertEqual(models.DATA_DEFAULT, result.host_profile) + self.assertEqual(models.DATA_DEFAULT, result.type) + self.assertIsInstance(result.ip, models.IPList) + + def test_dict_from_class(self): + """Tests production of a dictionary from a Host object""" + expected_result = { + self.HOST_NAME: { + 'host_profile': self.HOST_DATA['host_profile'], + 'ip': 'success', + 'type': self.HOST_DATA['type'] + } + } + host = models.Host(self.HOST_NAME, **self.HOST_DATA) + result = host.dict_from_class() + self.assertDictEqual(expected_result, result) + + def test_merge_additional_data(self): + """Tests merging of an additional data dictionary into a Host object""" + config_dict = copy(self.HOST_DATA) + config_dict['ip'] = 'success' + result = models.Host(self.HOST_NAME) + self.assertEqual(self.HOST_NAME, result.name) + self.assertEqual(models.DATA_DEFAULT, result.rack_name) + self.assertEqual(models.DATA_DEFAULT, result.host_profile) + self.assertEqual(models.DATA_DEFAULT, result.type) + self.assertIsInstance(result.ip, models.IPList) + result.merge_additional_data(config_dict) + self.assertEqual(self.HOST_NAME, result.name) + self.assertEqual(config_dict['rack_name'], result.rack_name) + self.assertEqual(config_dict['host_profile'], result.host_profile) + self.assertEqual(config_dict['type'], result.type) + self.assertEqual(config_dict['ip'], 'success') + + +class TestRack(unittest.TestCase): + """Tests for the Rack model""" + + RACK_NAME = 'test_rack1' + HOST_DATA = { + 'rack_name': RACK_NAME, + 'host_profile': 'host', + 'type': 'compute' + } + + @mock.patch('spyglass.data_extractor.models.IPList', autospec=True) + def setUp(self, MockIPList): + """Sets up a mocked IPList and a list of Host objects for testing""" + self.MockIPList = MockIPList + self.HOST_DATA['ip'] = MockIPList() + self.HOST_DATA['ip'].dict_from_class.return_value = 'success' + self.hosts = [ + models.Host('test_host1', **self.HOST_DATA), + models.Host('test_host2', **self.HOST_DATA), + models.Host('test_host3', **self.HOST_DATA), + ] + + def test___init__(self): + """Tests basic initialization of Rack""" + result = models.Rack(self.RACK_NAME, self.hosts) + self.assertEqual(self.RACK_NAME, result.name) + self.assertEqual(self.hosts, result.hosts) + + def test_dict_from_class(self): + """Tests production of a dictionary from a Rack object""" + expected_result = {self.RACK_NAME: {}} + expected_result[self.RACK_NAME].update(self.hosts[0].dict_from_class()) + expected_result[self.RACK_NAME].update(self.hosts[1].dict_from_class()) + expected_result[self.RACK_NAME].update(self.hosts[2].dict_from_class()) + result = models.Rack(self.RACK_NAME, self.hosts) + self.assertDictEqual(expected_result, result.dict_from_class()) + + def test_merge_additional_data_new_host(self): + """Tests merging of data containing a new host + + If the additional data dictionary contains a host not already contained + in Rack.hosts, Rack should add the host to the list. + """ + config_dict = {'test_host4': {**self.HOST_DATA}} + result = models.Rack(self.RACK_NAME, self.hosts) + self.assertIsNone(result.get_host_by_name('test_host4')) + result.merge_additional_data(config_dict) + self.assertIsNotNone(result.get_host_by_name('test_host4')) + + def test_merge_additional_data_existing_host(self): + """Tests merging of data containing data for an existing host + + If the additional data dictionary contains a host already contained in + Rack.hosts, Rack should call merge_additional_data on the existing host + with the new data. + """ + config_dict = {'test_host1': {'host_profile': 'new_profile'}} + result = models.Rack(self.RACK_NAME, self.hosts) + self.assertEqual( + self.HOST_DATA['host_profile'], + result.get_host_by_name('test_host1').host_profile) + result.merge_additional_data(config_dict) + self.assertEqual( + config_dict['test_host1']['host_profile'], + result.get_host_by_name('test_host1').host_profile) + + def test_get_host_by_name(self): + """Tests retrieval of a Rack's host by name""" + result = models.Rack(self.RACK_NAME, self.hosts) + self.assertEqual( + self.hosts[1], result.get_host_by_name(self.hosts[1].name)) + + +class TestVLANNetworkData(unittest.TestCase): + """Tests for the VLANNetworkData model""" + + VLAN_NAME = 'test' + VLAN_DATA = { + 'role': 'oam', + 'vlan': '23', + 'subnet': ['210.27.143.213', '127.13.31.192'], + 'routes': ['29.190.93.106', '252.240.25.174'], + 'gateway': '204.70.95.80', + 'dhcp_start': '88.9.225.29', + 'dhcp_end': '71.31.147.105', + 'static_start': '117.137.102.246', + 'static_end': '176.20.227.186', + 'reserved_start': '229.171.15.171', + 'reserved_end': '230.187.248.100' + } + + def test___init__(self): + """Tests basic initialization of VLANNetworkData""" + result = models.VLANNetworkData(self.VLAN_NAME, **self.VLAN_DATA) + self.assertEqual(self.VLAN_NAME, result.name) + self.assertEqual(self.VLAN_DATA['role'], result.role) + self.assertEqual(self.VLAN_DATA['vlan'], result.vlan) + self.assertEqual(self.VLAN_DATA['subnet'], result.subnet) + self.assertEqual(self.VLAN_DATA['routes'], result.routes) + self.assertEqual(self.VLAN_DATA['gateway'], result.gateway) + self.assertEqual(self.VLAN_DATA['dhcp_start'], result.dhcp_start) + self.assertEqual(self.VLAN_DATA['dhcp_end'], result.dhcp_end) + self.assertEqual(self.VLAN_DATA['static_start'], result.static_start) + self.assertEqual(self.VLAN_DATA['static_end'], result.static_end) + self.assertEqual( + self.VLAN_DATA['reserved_start'], result.reserved_start) + self.assertEqual(self.VLAN_DATA['reserved_end'], result.reserved_end) + + def test___init___missing_data(self): + """Tests initialization of VLANNetworkData with missing data + + Any data not explicitly given to VLANNetworkData should be set to None + or an empty list. Since VLANNetworkData can contain a variety of + different settings, many of which are not required, most of these + settings default to None so they will not be outputted when exporting + to a dictionary. + """ + result = models.VLANNetworkData(self.VLAN_NAME) + self.assertEqual(self.VLAN_NAME, result.name) + self.assertEqual(self.VLAN_NAME, result.role) + self.assertIsNone(result.vlan) + self.assertEqual([], result.subnet) + self.assertEqual([], result.routes) + self.assertIsNone(result.dhcp_start) + self.assertIsNone(result.dhcp_end) + self.assertIsNone(result.static_start) + self.assertIsNone(result.static_end) + self.assertIsNone(result.reserved_start) + self.assertIsNone(result.reserved_end) + + def test_dict_from_class(self): + """Tests production of a dictionary from a VLANNetworkData object""" + copy_vlan_data = copy(self.VLAN_DATA) + copy_vlan_data.pop('role') + expected_result = {self.VLAN_DATA['role']: {**copy_vlan_data}} + result = models.VLANNetworkData(self.VLAN_NAME, **self.VLAN_DATA) + self.assertDictEqual(expected_result, result.dict_from_class()) + + def test_merge_additional_data(self): + """Tests merging of additional data into a VLANNetworkData object""" + result = models.VLANNetworkData(self.VLAN_NAME) + self.assertEqual(self.VLAN_NAME, result.name) + self.assertEqual(self.VLAN_NAME, result.role) + self.assertIsNone(result.vlan) + self.assertEqual([], result.subnet) + self.assertEqual([], result.routes) + self.assertIsNone(result.dhcp_start) + self.assertIsNone(result.dhcp_end) + self.assertIsNone(result.static_start) + self.assertIsNone(result.static_end) + self.assertIsNone(result.reserved_start) + self.assertIsNone(result.reserved_end) + result.merge_additional_data(self.VLAN_DATA) + self.assertEqual(self.VLAN_NAME, result.name) + self.assertEqual(self.VLAN_DATA['role'], result.role) + self.assertEqual(self.VLAN_DATA['vlan'], result.vlan) + self.assertEqual(self.VLAN_DATA['subnet'], result.subnet) + self.assertEqual(self.VLAN_DATA['routes'], result.routes) + self.assertEqual(self.VLAN_DATA['gateway'], result.gateway) + self.assertEqual(self.VLAN_DATA['dhcp_start'], result.dhcp_start) + self.assertEqual(self.VLAN_DATA['dhcp_end'], result.dhcp_end) + self.assertEqual(self.VLAN_DATA['static_start'], result.static_start) + self.assertEqual(self.VLAN_DATA['static_end'], result.static_end) + self.assertEqual( + self.VLAN_DATA['reserved_start'], result.reserved_start) + self.assertEqual(self.VLAN_DATA['reserved_end'], result.reserved_end) + + +class TestNetwork(unittest.TestCase): + """Tests for the Network model""" + + VLAN_DATA = { + 'vlan': '23', + 'subnet': ['210.27.143.213', '127.13.31.192'], + 'routes': ['29.190.93.106', '252.240.25.174'], + 'gateway': '204.70.95.80', + 'dhcp_start': '88.9.225.29', + 'dhcp_end': '71.31.147.105', + 'static_start': '117.137.102.246', + 'static_end': '176.20.227.186', + 'reserved_start': '229.171.15.171', + 'reserved_end': '230.187.248.100' + } + BGP_DATA = { + 'asnumber': 64671, + 'ingress_vip': '10.0.220.73', + 'peer_asnumber': 64688 + } + + def setUp(self): + """Sets up a list of VLANNetworkData used to test Network objects""" + self.vlan_network_data = [ + models.VLANNetworkData('oam', **self.VLAN_DATA), + models.VLANNetworkData('oob', **self.VLAN_DATA), + models.VLANNetworkData('pxe', **self.VLAN_DATA) + ] + + def test___init__(self): + """Tests basic initialization of a Network object""" + result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA) + self.assertEqual( + set(self.vlan_network_data), set(result.vlan_network_data)) + self.assertDictEqual(self.BGP_DATA, result.bgp) + + def test_dict_from_class(self): + """Tests production of a dictionary from a Network object""" + expected_result = { + 'bgp': self.BGP_DATA, + 'vlan_network_data': { + 'oam': { + **self.VLAN_DATA + }, + 'oob': { + **self.VLAN_DATA + }, + 'pxe': { + **self.VLAN_DATA + }, + } + } + result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA) + self.assertDictEqual(expected_result, result.dict_from_class()) + + def test_merge_additional_data_bgp(self): + """Tests merging additional BGP data + + BGP data is often set after the initial generation of data objects. + """ + result = models.Network(self.vlan_network_data) + self.assertEqual({}, result.bgp) + result.merge_additional_data({'bgp': self.BGP_DATA}) + self.assertEqual(self.BGP_DATA, result.bgp) + + def test_merge_additional_data_new_vlan_data(self): + """Tests merging of data containing data for a new VLANNetworkData obj + + If a new set of VLANNetworkData is introduced in additional data, + Network should create a new VLANNetworkData object for its list. + """ + result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA) + self.assertIsNone(result.get_vlan_data_by_name('calico')) + new_calico_vlan = {'vlan_network_data': {'calico': {**self.VLAN_DATA}}} + result.merge_additional_data(new_calico_vlan) + self.assertIsNotNone(result.get_vlan_data_by_name('calico')) + + def test_merge_additional_data_new_data_for_existing_vlan(self): + """Tests merging of data for an existing VLANNetworkData object + + If a new set of data for an existing VLANNetworkData role is given by + additional data, Network should merge this new data into the existing + VLANNetworkData object. + """ + result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA) + self.assertEqual( + self.VLAN_DATA['vlan'], + result.get_vlan_data_by_name('oob').vlan) + new_vlan_data = {'vlan_network_data': {'oob': {'vlan': '12'}}} + result.merge_additional_data(new_vlan_data) + self.assertEqual( + new_vlan_data['vlan_network_data']['oob']['vlan'], + result.get_vlan_data_by_name('oob').vlan) + + def test_get_vlan_data_by_name(self): + """Tests retrieval of VLANNetworkData by name attribute""" + result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA) + self.assertEqual( + self.vlan_network_data[1], result.get_vlan_data_by_name('oob')) + + def test_get_vlan_data_by_name_dne(self): + """Tests retrieval of nonexistent name VLANNetworkData""" + result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA) + self.assertIsNone(result.get_vlan_data_by_name('calico')) + + def test_get_vlan_data_by_role(self): + """Tests retrieval of VLANNetworkData by role attribute""" + result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA) + self.assertEqual( + self.vlan_network_data[1], result.get_vlan_data_by_role('oob')) + + def test_get_vlan_data_by_role_dne(self): + """Tests retrieval of nonexistent role VLANNetworkData""" + result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA) + self.assertIsNone(result.get_vlan_data_by_role('calico')) + + +class TestSiteInfo(unittest.TestCase): + """Tests for the SiteInfo model""" + + SITE_NAME = 'Test Site' + SITE_INFO = { + 'physical_location_id': 12345, + 'state': 'MO', + 'country': 'USA', + 'corridor': 'C1', + 'sitetype': 'test', + 'dns': ['210.27.143.213', '127.13.31.192'], + 'ntp': ['29.190.93.106', '252.240.25.174'], + 'domain': 'example.com', + 'ldap': { + 'common_name': 'test', + 'domain': 'example', + 'subdomain': 'test', + 'url': 'ldap://ldap.example.com' + } + } + + def test___init__(self): + """Tests basic initialization of SiteInfo""" + result = models.SiteInfo(self.SITE_NAME, **self.SITE_INFO) + self.assertEqual(self.SITE_NAME, result.name) + self.assertEqual( + self.SITE_INFO['physical_location_id'], + result.physical_location_id) + self.assertEqual(self.SITE_INFO['state'], result.state) + self.assertEqual(self.SITE_INFO['country'], result.country) + self.assertEqual(self.SITE_INFO['corridor'], result.corridor) + self.assertEqual(self.SITE_INFO['sitetype'], result.sitetype) + self.assertEqual(','.join(self.SITE_INFO['dns']), str(result.dns)) + self.assertEqual(','.join(self.SITE_INFO['ntp']), str(result.ntp)) + self.assertEqual(self.SITE_INFO['domain'], result.domain) + self.assertDictEqual(self.SITE_INFO['ldap'], result.ldap) + + def test___init___missing_data(self): + """Tests initailization of SiteInfo with missing data + + If data is not given for SiteInfo attributes, the attributes should + be set to the value given by models.DATA_DEFAULT, an empty list, or + an empty dictionary. + """ + result = models.SiteInfo(self.SITE_NAME) + self.assertEqual(self.SITE_NAME, result.name) + self.assertEqual(models.DATA_DEFAULT, result.physical_location_id) + self.assertEqual(models.DATA_DEFAULT, result.state) + self.assertEqual(models.DATA_DEFAULT, result.country) + self.assertEqual(models.DATA_DEFAULT, result.corridor) + self.assertEqual(models.DATA_DEFAULT, result.sitetype) + self.assertEqual(','.join([]), str(result.dns)) + self.assertEqual(','.join([]), str(result.ntp)) + self.assertEqual(models.DATA_DEFAULT, result.domain) + self.assertDictEqual({}, result.ldap) + + def test_dict_from_class(self): + """Tests production of a dictionary from a SiteInfo object""" + expected_results = copy(self.SITE_INFO) + expected_results['dns'] = ','.join(expected_results['dns']) + expected_results['ntp'] = ','.join(expected_results['ntp']) + expected_results['name'] = self.SITE_NAME + result = models.SiteInfo(self.SITE_NAME, **self.SITE_INFO) + self.assertDictEqual(expected_results, result.dict_from_class()) + + def test_merge_additional_data(self): + """Tests merging of additional data into SiteInfo""" + result = models.SiteInfo(self.SITE_NAME) + self.assertEqual(self.SITE_NAME, result.name) + self.assertEqual(models.DATA_DEFAULT, result.physical_location_id) + self.assertEqual(models.DATA_DEFAULT, result.state) + self.assertEqual(models.DATA_DEFAULT, result.country) + self.assertEqual(models.DATA_DEFAULT, result.corridor) + self.assertEqual(models.DATA_DEFAULT, result.sitetype) + self.assertEqual(','.join([]), str(result.dns)) + self.assertEqual(','.join([]), str(result.ntp)) + self.assertEqual(models.DATA_DEFAULT, result.domain) + self.assertDictEqual({}, result.ldap) + config_dict = copy(self.SITE_INFO) + config_dict['dns'] = {'servers': config_dict['dns']} + config_dict['ntp'] = {'servers': config_dict['ntp']} + config_dict['name'] = 'new_name' + result.merge_additional_data(config_dict) + self.assertEqual(config_dict['name'], result.name) + self.assertEqual( + self.SITE_INFO['physical_location_id'], + result.physical_location_id) + self.assertEqual(self.SITE_INFO['state'], result.state) + self.assertEqual(self.SITE_INFO['country'], result.country) + self.assertEqual(self.SITE_INFO['corridor'], result.corridor) + self.assertEqual(self.SITE_INFO['sitetype'], result.sitetype) + self.assertEqual(','.join(self.SITE_INFO['dns']), str(result.dns)) + self.assertEqual(','.join(self.SITE_INFO['ntp']), str(result.ntp)) + self.assertEqual(self.SITE_INFO['domain'], result.domain) + self.assertDictEqual(self.SITE_INFO['ldap'], result.ldap) + + +class TestSiteDocumentData(unittest.TestCase): + """Tests for the SiteDocumentData model""" + + STORAGE_DICT = {'ceph': {'controller': {'osd_count': 6}}} + + @mock.patch('spyglass.data_extractor.models.SiteInfo') + @mock.patch('spyglass.data_extractor.models.Network') + @mock.patch('spyglass.data_extractor.models.Rack') + def test___init__(self, Rack, Network, SiteInfo): + """Tests basic initialization of SiteDocumentData""" + site_info = SiteInfo() + network = Network() + baremetal = [Rack(), Rack(), Rack()] + result = models.SiteDocumentData( + site_info, network, baremetal, self.STORAGE_DICT) + self.assertEqual(site_info, result.site_info) + self.assertEqual(network, result.network) + self.assertEqual(set(baremetal), set(result.baremetal)) + self.assertDictEqual(self.STORAGE_DICT, result.storage) + + @mock.patch('spyglass.data_extractor.models.SiteInfo') + @mock.patch('spyglass.data_extractor.models.Network') + @mock.patch('spyglass.data_extractor.models.Rack') + def test_dict_from_class(self, Rack, Network, SiteInfo): + """Tests production of a dictionary from a SiteDocumentData object""" + mock_site_info_data = {'name': 'test', 'country': 'USA'} + mock_network_data = { + 'bgp': 'bgp_data', + 'vlan_network_data': 'vlan_data' + } + mock_baremetal0_data = {'rack1': {'host1': 'data'}} + mock_baremetal1_data = {'rack2': {'host2': 'data'}} + + site_info = SiteInfo() + site_info.dict_from_class.return_value = mock_site_info_data + network = Network() + network.dict_from_class.return_value = mock_network_data + baremetal = [Rack(), Rack()] + Rack().dict_from_class.side_effect = \ + [mock_baremetal0_data, mock_baremetal1_data] + + expected_result = { + 'baremetal': { + **mock_baremetal0_data, + **mock_baremetal1_data + }, + 'network': mock_network_data, + 'site_info': mock_site_info_data, + 'storage': self.STORAGE_DICT + } + + result = models.SiteDocumentData( + site_info, network, baremetal, self.STORAGE_DICT) + self.assertDictEqual(expected_result, result.dict_from_class()) + + @mock.patch('spyglass.data_extractor.models.SiteInfo') + @mock.patch('spyglass.data_extractor.models.Network') + @mock.patch('spyglass.data_extractor.models.Rack') + def test_merge_additional_data_storage(self, Rack, Network, SiteInfo): + """Tests merging of storage data dictionary + + Storage data is often given after initialization of data objects. + """ + site_info = SiteInfo() + network = Network() + baremetal = [Rack(), Rack(), Rack()] + result = models.SiteDocumentData(site_info, network, baremetal) + self.assertIsNone(result.storage) + result.merge_additional_data({'storage': self.STORAGE_DICT}) + self.assertDictEqual(self.STORAGE_DICT, result.storage) + + @mock.patch('spyglass.data_extractor.models.SiteInfo') + @mock.patch('spyglass.data_extractor.models.Network') + @mock.patch('spyglass.data_extractor.models.Rack') + def test_get_baremetal_rack_by_name(self, Rack, Network, SiteInfo): + """Tests retrieval of baremetal rack by name""" + site_info = SiteInfo() + network = Network() + baremetal = [Rack(), Rack(), Rack()] + type(Rack()).name = mock.PropertyMock( + side_effect=['rack1', 'rack2', 'rack3']) + result = models.SiteDocumentData(site_info, network, baremetal) + self.assertIsNotNone(result.get_baremetal_rack_by_name('rack2')) + + @mock.patch('spyglass.data_extractor.models.SiteInfo') + @mock.patch('spyglass.data_extractor.models.Network') + @mock.patch('spyglass.data_extractor.models.Rack') + def test_get_baremetal_rack_by_name_dne(self, Rack, Network, SiteInfo): + """Tests retrieval of nonexistent baremetal rack by name""" + site_info = SiteInfo() + network = Network() + baremetal = [Rack(), Rack()] + type(Rack()).name = mock.PropertyMock(side_effect=['rack1', 'rack3']) + result = models.SiteDocumentData(site_info, network, baremetal) + self.assertIsNone(result.get_baremetal_rack_by_name('rack2')) diff --git a/tox.ini b/tox.ini index 54ed7be..60481e6 100644 --- a/tox.ini +++ b/tox.ini @@ -72,6 +72,6 @@ deps = commands = bash -c 'PATH=$PATH:~/.local/bin; pytest --cov=spyglass --cov-report \ html:cover --cov-report xml:cover/coverage.xml --cov-report term \ - --cov-fail-under 50 tests/' + --cov-fail-under 60 tests/' whitelist_externals = bash