diff --git a/spyglass/data_extractor/models.py b/spyglass/data_extractor/models.py index df8cc24..aabdc4e 100644 --- a/spyglass/data_extractor/models.py +++ b/spyglass/data_extractor/models.py @@ -57,7 +57,7 @@ class ServerList(object): def __iter__(self): yield from self.servers - def merge(self, server_list: str): + def merge(self, server_list): """Merges a comma separated server list into the object This method is used to merge additional servers into the list. This is @@ -106,7 +106,7 @@ class IPList(object): 'overlay': self.overlay, 'pxe': self.pxe, 'storage': self.storage - } + }.items() def set_ip_by_role(self, role: str, new_value): if role == 'oob': @@ -187,12 +187,14 @@ class Host(object): return { self.name: { 'host_profile': self.host_profile, - 'ip': self.ip, + 'ip': self.ip.dict_from_class(), 'type': self.type } } def merge_additional_data(self, config_dict: dict): + if 'rack_name' in config_dict: + self.rack_name = config_dict['rack_name'] if 'type' in config_dict: self.type = config_dict['type'] if 'host_profile' in config_dict: @@ -218,7 +220,7 @@ class Rack(object): """Creates a writeable dict structure from the object""" rack_as_dict = {self.name: {}} for host in self.hosts: - rack_as_dict.update(host.dict_from_class()) + rack_as_dict[self.name].update(host.dict_from_class()) return rack_as_dict def merge_additional_data(self, config_dict: dict): @@ -257,9 +259,16 @@ class VLANNetworkData(object): :Keyword Arguments: * *role* (``str``) - Role of the data entry, defaults to name - * *vlan* (``str``, ``int``) - - * *type* (``str``) - Host type - * *ip* (``IPList``) - List of IP addresses for baremetal host + * *vlan* (``str``, ``int``) - virtual LAN ID number as str or int + * *subnet* (``list``) - list of subnet IP addresses as strings + * *routes* (``list``) - list of routes IP addresses as strings + * *gateway* - gateway address + * *dhcp_start* - DHCP range start + * *dhcp_end* - DHCP range end + * *static_start* - static IP range start + * *static_end* - static IP range end + * *reserved_start* - reserved IP range start + * *reserved_end* - reserved IP range end """ self.name = name self.role = kwargs.get('role', self.name) @@ -307,6 +316,8 @@ class VLANNetworkData(object): return vlan_dict def merge_additional_data(self, config_dict: dict): + if 'role' in config_dict: + self.role = config_dict['role'] if 'vlan' in config_dict: self.vlan = config_dict['vlan'] if 'subnet' in config_dict: @@ -320,15 +331,15 @@ class VLANNetworkData(object): if 'dhcp_start' in config_dict: self.dhcp_start = config_dict['dhcp_start'] if 'dhcp_end' in config_dict: - self.dhcp_start = config_dict['dhcp_end'] + self.dhcp_end = config_dict['dhcp_end'] if 'static_start' in config_dict: - self.dhcp_start = config_dict['static_start'] + self.static_start = config_dict['static_start'] if 'static_end' in config_dict: - self.dhcp_start = config_dict['static_end'] + self.static_end = config_dict['static_end'] if 'reserved_start' in config_dict: - self.dhcp_start = config_dict['reserved_start'] + self.reserved_start = config_dict['reserved_start'] if 'reserved_end' in config_dict: - self.dhcp_start = config_dict['reserved_end'] + self.reserved_end = config_dict['reserved_end'] class Network(object): @@ -445,11 +456,11 @@ class SiteInfo(object): return { 'corridor': self.corridor, 'country': self.country, - 'dns': self.dns, + 'dns': str(self.dns), 'domain': self.domain, 'ldap': self.ldap, 'name': self.name, - 'ntp': self.ntp, + 'ntp': str(self.ntp), 'physical_location_id': self.physical_location_id, 'sitetype': self.sitetype, 'state': self.state, diff --git a/tests/unit/data_extractor/test_models.py b/tests/unit/data_extractor/test_models.py new file mode 100644 index 0000000..71a380b --- /dev/null +++ b/tests/unit/data_extractor/test_models.py @@ -0,0 +1,821 @@ +# Copyright 2019 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from copy import copy +import unittest +from unittest import mock + +from spyglass.data_extractor import models + + +class TestParseIp(unittest.TestCase): + """Tests the _parse_ip validator for Spyglass models""" + + def test__parse_ip(self): + """Tests basic function of _parse_ip validator""" + addr = '10.23.0.1' + result = models._parse_ip(addr) + self.assertEqual(addr, result) + + def test__parse_ip_bad_ip(self): + """Tests that invalid IP addresses are logged as a warning and returned + + without changes + """ + addr = 'not4nip4ddr3$$' + expected_message = '%s is not a valid IP address.' % addr + with self.assertLogs(level='WARNING') as test_log: + result = models._parse_ip(addr) + self.assertEqual(len(test_log.output), 1) + self.assertEqual(len(test_log.records), 1) + self.assertIn(expected_message, test_log.output[0]) + self.assertEqual(addr, result) + + +class TestServerList(unittest.TestCase): + """Tests for the ServerList model""" + + VALID_SERVERS = ['121.12.13.1', '193.153.1.1', '12.23.9.11'] + INVALID_SERVERS = ['not4nip4ddr3$$', '124.34.1.1', 'ALSONOTVALID'] + + def test___init__(self): + """Tests basic initialization of ServerList""" + result = models.ServerList(self.VALID_SERVERS) + self.assertEqual(set(self.VALID_SERVERS), set(result.servers)) + + def test___init___invalid_servers(self): + """Tests initialization of ServerList with bad IPs + + If ServerList is given IP addresses that appear invalid, it should log + warnings for the user. ServerList should still initialize regardless + of the IPs it is given. + """ + expected_messages = [ + '%s is not a valid IP address.' % self.INVALID_SERVERS[0], + '%s is not a valid IP address.' % self.INVALID_SERVERS[2] + ] + with self.assertLogs(level='WARNING') as test_log: + result = models.ServerList(self.INVALID_SERVERS) + self.assertEqual(len(test_log.output), 2) + self.assertEqual(len(test_log.records), 2) + self.assertIn(expected_messages[0], test_log.output[0]) + self.assertIn(expected_messages[1], test_log.output[1]) + self.assertEqual(set(self.INVALID_SERVERS), set(result.servers)) + + def test___str__(self): + """Tests str type casting for ServerList""" + expected_result = ','.join(self.VALID_SERVERS) + result = models.ServerList(self.VALID_SERVERS) + self.assertEqual(expected_result, str(result)) + + def test___iter__(self): + """Tests iterator builtin for ServerList""" + result = models.ServerList(self.VALID_SERVERS) + iterator = iter(result) + self.assertEqual(iterator.__next__(), self.VALID_SERVERS[0]) + self.assertEqual(iterator.__next__(), self.VALID_SERVERS[1]) + self.assertEqual(iterator.__next__(), self.VALID_SERVERS[2]) + + def test_merge_list(self): + """Tests that ServerList can merge additional servers from a list + + ServerList should be able to accept a list type object of IP address + strings and merge them into its current list of servers. + """ + list_to_merge = ['1.1.1.1', '2.2.2.2'] + result = models.ServerList(self.VALID_SERVERS) + self.assertEqual(set(self.VALID_SERVERS), set(result.servers)) + expected_result = [*self.VALID_SERVERS, *list_to_merge] + result.merge(list_to_merge) + self.assertEqual(set(expected_result), set(result.servers)) + + def test_merge_str(self): + """Tests that ServerList can merge additional servers from a str + + ServerList should be able to accept a comma separated list of IP + addresses as strings and merge them into its current list of servers. + """ + list_to_merge = '1.1.1.1,2.2.2.2' + result = models.ServerList(self.VALID_SERVERS) + self.assertEqual(set(self.VALID_SERVERS), set(result.servers)) + expected_result = [*self.VALID_SERVERS, *(list_to_merge.split(','))] + result.merge(list_to_merge) + self.assertEqual(set(expected_result), set(result.servers)) + + +class TestIPList(unittest.TestCase): + """Tests for the IPList model""" + + VALID_IP = { + 'oob': '14.102.252.126', + 'oam': '120.145.167.87', + 'calico': '46.12.178.235', + 'overlay': '226.208.39.49', + 'pxe': '164.99.192.149', + 'storage': '252.63.220.22' + } + INVALID_IP = { + 'oob': '14.102.252.126', + 'oam': 'not4nip4ddr3$$', + 'calico': '46.12.178.235', + 'overlay': '226.208.39.49', + 'pxe': '164.99.192.149', + 'storage': '252.63.220.22' + } + MISSING_IP = { + 'oob': '14.102.252.126', + 'calico': '46.12.178.235', + 'overlay': '226.208.39.49', + 'pxe': '164.99.192.149', + 'storage': '252.63.220.22' + } + + def test___init__(self): + """Tests basic initialization of an IPList""" + result = models.IPList(**self.VALID_IP) + self.assertEqual(self.VALID_IP['oob'], result.oob) + self.assertEqual(self.VALID_IP['oam'], result.oam) + self.assertEqual(self.VALID_IP['calico'], result.calico) + self.assertEqual(self.VALID_IP['overlay'], result.overlay) + self.assertEqual(self.VALID_IP['pxe'], result.pxe) + self.assertEqual(self.VALID_IP['storage'], result.storage) + + def test___init___invalid_ip(self): + """Tests initialization of an IPList using invalid IPs + + When invalid IP addresses are given to IPList, it should log a warning + to the user using _parse_ip(). IPList should still be created using + the invalid address. + """ + expected_message = \ + '%s is not a valid IP address.' % self.INVALID_IP['oam'] + with self.assertLogs(level='WARNING') as test_log: + result = models.IPList(**self.INVALID_IP) + self.assertEqual(len(test_log.output), 1) + self.assertEqual(len(test_log.records), 1) + self.assertIn(expected_message, test_log.output[0]) + self.assertEqual(self.INVALID_IP['oob'], result.oob) + self.assertEqual(self.INVALID_IP['oam'], result.oam) + self.assertEqual(self.INVALID_IP['calico'], result.calico) + self.assertEqual(self.INVALID_IP['overlay'], result.overlay) + self.assertEqual(self.INVALID_IP['pxe'], result.pxe) + self.assertEqual(self.INVALID_IP['storage'], result.storage) + + def test___init___missing_ip(self): + """Tests initialization of an IPList with an entry missing + + IPList should automatically fill in any missing entries with the value + set by models.DATA_DEFAULT. + """ + expected_message = \ + '%s is not a valid IP address.' % models.DATA_DEFAULT + with self.assertLogs(level='WARNING') as test_log: + result = models.IPList(**self.MISSING_IP) + self.assertEqual(len(test_log.output), 1) + self.assertEqual(len(test_log.records), 1) + self.assertIn(expected_message, test_log.output[0]) + self.assertEqual(self.MISSING_IP['oob'], result.oob) + self.assertEqual(models.DATA_DEFAULT, result.oam) + self.assertEqual(self.MISSING_IP['calico'], result.calico) + self.assertEqual(self.MISSING_IP['overlay'], result.overlay) + self.assertEqual(self.MISSING_IP['pxe'], result.pxe) + self.assertEqual(self.MISSING_IP['storage'], result.storage) + + def test___iter__(self): + """Tests iterator builtin for IPList + + When iter() is called on an IPList, it should yield key-value pairs of + each role and its associated IP address. + """ + result = models.IPList(**self.VALID_IP) + for key, value in result.__iter__(): + self.assertIn(key, self.VALID_IP) + self.assertEqual(self.VALID_IP[key], value) + + def test_set_ip_by_role(self): + """Tests setting a single IP by role""" + result = models.IPList(**self.VALID_IP) + new_calico_ip = '87.85.178.249' + result.set_ip_by_role('calico', new_calico_ip) + self.assertEqual(self.VALID_IP['oob'], result.oob) + self.assertEqual(self.VALID_IP['oam'], result.oam) + self.assertEqual(new_calico_ip, result.calico) + self.assertEqual(self.VALID_IP['overlay'], result.overlay) + self.assertEqual(self.VALID_IP['pxe'], result.pxe) + self.assertEqual(self.VALID_IP['storage'], result.storage) + + def test_set_ip_by_role_invalid_role(self): + """Tests setting an invalid role's IP + + Attempting to set an invalid role should log a warning to the user and + do nothing to the IPList's data. + """ + result = models.IPList(**self.VALID_IP) + new_ip = '87.85.178.249' + role = 'DNE' + expected_message = '%s role is not defined for IPList.' % role + with self.assertLogs(level='WARNING') as test_log: + result.set_ip_by_role(role, new_ip) + self.assertEqual(len(test_log.output), 1) + self.assertEqual(len(test_log.records), 1) + self.assertIn(expected_message, test_log.output[0]) + self.assertEqual(self.VALID_IP['oob'], result.oob) + self.assertEqual(self.VALID_IP['oam'], result.oam) + self.assertEqual(self.VALID_IP['calico'], result.calico) + self.assertEqual(self.VALID_IP['overlay'], result.overlay) + self.assertEqual(self.VALID_IP['pxe'], result.pxe) + self.assertEqual(self.VALID_IP['storage'], result.storage) + + def test_dict_from_class(self): + """Tests production of a dictionary from IPList""" + ip_list = models.IPList(**self.VALID_IP) + result = ip_list.dict_from_class() + self.assertDictEqual(self.VALID_IP, result) + + def test_dict_from_class_missing_ip(self): + """Tests production of a dictionary from IPList with a missing IP + + If an IP address is not set for the IPList, it should not appear in the + dictionary output. + """ + missing_ip = copy(self.MISSING_IP) + missing_ip['oam'] = '' + ip_list = models.IPList(**missing_ip) + result = ip_list.dict_from_class() + self.assertDictEqual(self.MISSING_IP, result) + + def test_merge_additional_data(self): + """Tests merging of additional data dictionaries + + Tests that merging of additional data will set a missing role's IP. + """ + config_dict = {'oam': '87.85.178.249'} + ip_list = models.IPList(**self.MISSING_IP) + ip_list.merge_additional_data(config_dict) + self.assertEqual(self.MISSING_IP['oob'], ip_list.oob) + self.assertEqual(config_dict['oam'], ip_list.oam) + self.assertEqual(self.MISSING_IP['calico'], ip_list.calico) + self.assertEqual(self.MISSING_IP['overlay'], ip_list.overlay) + self.assertEqual(self.MISSING_IP['pxe'], ip_list.pxe) + self.assertEqual(self.MISSING_IP['storage'], ip_list.storage) + + +class TestHost(unittest.TestCase): + """Tests for the Host model""" + + HOST_NAME = 'test_host1' + HOST_DATA = { + 'rack_name': 'rack01', + 'host_profile': 'host', + 'type': 'compute' + } + + @mock.patch('spyglass.data_extractor.models.IPList', autospec=True) + def setUp(self, MockIPList): + """Initializes a mocked IPList""" + self.MockIPList = MockIPList + self.HOST_DATA['ip'] = MockIPList() + self.HOST_DATA['ip'].dict_from_class.return_value = 'success' + + def test___init__(self): + """Tests basic initialization of Host""" + result = models.Host(self.HOST_NAME, **self.HOST_DATA) + self.assertEqual(self.HOST_NAME, result.name) + self.assertEqual(self.HOST_DATA['rack_name'], result.rack_name) + self.assertEqual(self.HOST_DATA['host_profile'], result.host_profile) + self.assertEqual(self.HOST_DATA['type'], result.type) + self.assertEqual(self.HOST_DATA['ip'], result.ip) + + def test___init___missing_data(self): + """Tests initialization of Host with missing data + + Unless an attribute is required, Host should automatically fill in the + value set by models.DATA_DEFAULT for each attribute. The only exception + is for the Host's ip attribute which will be a new instance of IPList. + """ + result = models.Host(self.HOST_NAME) + self.assertEqual(self.HOST_NAME, result.name) + self.assertEqual(models.DATA_DEFAULT, result.rack_name) + self.assertEqual(models.DATA_DEFAULT, result.host_profile) + self.assertEqual(models.DATA_DEFAULT, result.type) + self.assertIsInstance(result.ip, models.IPList) + + def test_dict_from_class(self): + """Tests production of a dictionary from a Host object""" + expected_result = { + self.HOST_NAME: { + 'host_profile': self.HOST_DATA['host_profile'], + 'ip': 'success', + 'type': self.HOST_DATA['type'] + } + } + host = models.Host(self.HOST_NAME, **self.HOST_DATA) + result = host.dict_from_class() + self.assertDictEqual(expected_result, result) + + def test_merge_additional_data(self): + """Tests merging of an additional data dictionary into a Host object""" + config_dict = copy(self.HOST_DATA) + config_dict['ip'] = 'success' + result = models.Host(self.HOST_NAME) + self.assertEqual(self.HOST_NAME, result.name) + self.assertEqual(models.DATA_DEFAULT, result.rack_name) + self.assertEqual(models.DATA_DEFAULT, result.host_profile) + self.assertEqual(models.DATA_DEFAULT, result.type) + self.assertIsInstance(result.ip, models.IPList) + result.merge_additional_data(config_dict) + self.assertEqual(self.HOST_NAME, result.name) + self.assertEqual(config_dict['rack_name'], result.rack_name) + self.assertEqual(config_dict['host_profile'], result.host_profile) + self.assertEqual(config_dict['type'], result.type) + self.assertEqual(config_dict['ip'], 'success') + + +class TestRack(unittest.TestCase): + """Tests for the Rack model""" + + RACK_NAME = 'test_rack1' + HOST_DATA = { + 'rack_name': RACK_NAME, + 'host_profile': 'host', + 'type': 'compute' + } + + @mock.patch('spyglass.data_extractor.models.IPList', autospec=True) + def setUp(self, MockIPList): + """Sets up a mocked IPList and a list of Host objects for testing""" + self.MockIPList = MockIPList + self.HOST_DATA['ip'] = MockIPList() + self.HOST_DATA['ip'].dict_from_class.return_value = 'success' + self.hosts = [ + models.Host('test_host1', **self.HOST_DATA), + models.Host('test_host2', **self.HOST_DATA), + models.Host('test_host3', **self.HOST_DATA), + ] + + def test___init__(self): + """Tests basic initialization of Rack""" + result = models.Rack(self.RACK_NAME, self.hosts) + self.assertEqual(self.RACK_NAME, result.name) + self.assertEqual(self.hosts, result.hosts) + + def test_dict_from_class(self): + """Tests production of a dictionary from a Rack object""" + expected_result = {self.RACK_NAME: {}} + expected_result[self.RACK_NAME].update(self.hosts[0].dict_from_class()) + expected_result[self.RACK_NAME].update(self.hosts[1].dict_from_class()) + expected_result[self.RACK_NAME].update(self.hosts[2].dict_from_class()) + result = models.Rack(self.RACK_NAME, self.hosts) + self.assertDictEqual(expected_result, result.dict_from_class()) + + def test_merge_additional_data_new_host(self): + """Tests merging of data containing a new host + + If the additional data dictionary contains a host not already contained + in Rack.hosts, Rack should add the host to the list. + """ + config_dict = {'test_host4': {**self.HOST_DATA}} + result = models.Rack(self.RACK_NAME, self.hosts) + self.assertIsNone(result.get_host_by_name('test_host4')) + result.merge_additional_data(config_dict) + self.assertIsNotNone(result.get_host_by_name('test_host4')) + + def test_merge_additional_data_existing_host(self): + """Tests merging of data containing data for an existing host + + If the additional data dictionary contains a host already contained in + Rack.hosts, Rack should call merge_additional_data on the existing host + with the new data. + """ + config_dict = {'test_host1': {'host_profile': 'new_profile'}} + result = models.Rack(self.RACK_NAME, self.hosts) + self.assertEqual( + self.HOST_DATA['host_profile'], + result.get_host_by_name('test_host1').host_profile) + result.merge_additional_data(config_dict) + self.assertEqual( + config_dict['test_host1']['host_profile'], + result.get_host_by_name('test_host1').host_profile) + + def test_get_host_by_name(self): + """Tests retrieval of a Rack's host by name""" + result = models.Rack(self.RACK_NAME, self.hosts) + self.assertEqual( + self.hosts[1], result.get_host_by_name(self.hosts[1].name)) + + +class TestVLANNetworkData(unittest.TestCase): + """Tests for the VLANNetworkData model""" + + VLAN_NAME = 'test' + VLAN_DATA = { + 'role': 'oam', + 'vlan': '23', + 'subnet': ['210.27.143.213', '127.13.31.192'], + 'routes': ['29.190.93.106', '252.240.25.174'], + 'gateway': '204.70.95.80', + 'dhcp_start': '88.9.225.29', + 'dhcp_end': '71.31.147.105', + 'static_start': '117.137.102.246', + 'static_end': '176.20.227.186', + 'reserved_start': '229.171.15.171', + 'reserved_end': '230.187.248.100' + } + + def test___init__(self): + """Tests basic initialization of VLANNetworkData""" + result = models.VLANNetworkData(self.VLAN_NAME, **self.VLAN_DATA) + self.assertEqual(self.VLAN_NAME, result.name) + self.assertEqual(self.VLAN_DATA['role'], result.role) + self.assertEqual(self.VLAN_DATA['vlan'], result.vlan) + self.assertEqual(self.VLAN_DATA['subnet'], result.subnet) + self.assertEqual(self.VLAN_DATA['routes'], result.routes) + self.assertEqual(self.VLAN_DATA['gateway'], result.gateway) + self.assertEqual(self.VLAN_DATA['dhcp_start'], result.dhcp_start) + self.assertEqual(self.VLAN_DATA['dhcp_end'], result.dhcp_end) + self.assertEqual(self.VLAN_DATA['static_start'], result.static_start) + self.assertEqual(self.VLAN_DATA['static_end'], result.static_end) + self.assertEqual( + self.VLAN_DATA['reserved_start'], result.reserved_start) + self.assertEqual(self.VLAN_DATA['reserved_end'], result.reserved_end) + + def test___init___missing_data(self): + """Tests initialization of VLANNetworkData with missing data + + Any data not explicitly given to VLANNetworkData should be set to None + or an empty list. Since VLANNetworkData can contain a variety of + different settings, many of which are not required, most of these + settings default to None so they will not be outputted when exporting + to a dictionary. + """ + result = models.VLANNetworkData(self.VLAN_NAME) + self.assertEqual(self.VLAN_NAME, result.name) + self.assertEqual(self.VLAN_NAME, result.role) + self.assertIsNone(result.vlan) + self.assertEqual([], result.subnet) + self.assertEqual([], result.routes) + self.assertIsNone(result.dhcp_start) + self.assertIsNone(result.dhcp_end) + self.assertIsNone(result.static_start) + self.assertIsNone(result.static_end) + self.assertIsNone(result.reserved_start) + self.assertIsNone(result.reserved_end) + + def test_dict_from_class(self): + """Tests production of a dictionary from a VLANNetworkData object""" + copy_vlan_data = copy(self.VLAN_DATA) + copy_vlan_data.pop('role') + expected_result = {self.VLAN_DATA['role']: {**copy_vlan_data}} + result = models.VLANNetworkData(self.VLAN_NAME, **self.VLAN_DATA) + self.assertDictEqual(expected_result, result.dict_from_class()) + + def test_merge_additional_data(self): + """Tests merging of additional data into a VLANNetworkData object""" + result = models.VLANNetworkData(self.VLAN_NAME) + self.assertEqual(self.VLAN_NAME, result.name) + self.assertEqual(self.VLAN_NAME, result.role) + self.assertIsNone(result.vlan) + self.assertEqual([], result.subnet) + self.assertEqual([], result.routes) + self.assertIsNone(result.dhcp_start) + self.assertIsNone(result.dhcp_end) + self.assertIsNone(result.static_start) + self.assertIsNone(result.static_end) + self.assertIsNone(result.reserved_start) + self.assertIsNone(result.reserved_end) + result.merge_additional_data(self.VLAN_DATA) + self.assertEqual(self.VLAN_NAME, result.name) + self.assertEqual(self.VLAN_DATA['role'], result.role) + self.assertEqual(self.VLAN_DATA['vlan'], result.vlan) + self.assertEqual(self.VLAN_DATA['subnet'], result.subnet) + self.assertEqual(self.VLAN_DATA['routes'], result.routes) + self.assertEqual(self.VLAN_DATA['gateway'], result.gateway) + self.assertEqual(self.VLAN_DATA['dhcp_start'], result.dhcp_start) + self.assertEqual(self.VLAN_DATA['dhcp_end'], result.dhcp_end) + self.assertEqual(self.VLAN_DATA['static_start'], result.static_start) + self.assertEqual(self.VLAN_DATA['static_end'], result.static_end) + self.assertEqual( + self.VLAN_DATA['reserved_start'], result.reserved_start) + self.assertEqual(self.VLAN_DATA['reserved_end'], result.reserved_end) + + +class TestNetwork(unittest.TestCase): + """Tests for the Network model""" + + VLAN_DATA = { + 'vlan': '23', + 'subnet': ['210.27.143.213', '127.13.31.192'], + 'routes': ['29.190.93.106', '252.240.25.174'], + 'gateway': '204.70.95.80', + 'dhcp_start': '88.9.225.29', + 'dhcp_end': '71.31.147.105', + 'static_start': '117.137.102.246', + 'static_end': '176.20.227.186', + 'reserved_start': '229.171.15.171', + 'reserved_end': '230.187.248.100' + } + BGP_DATA = { + 'asnumber': 64671, + 'ingress_vip': '10.0.220.73', + 'peer_asnumber': 64688 + } + + def setUp(self): + """Sets up a list of VLANNetworkData used to test Network objects""" + self.vlan_network_data = [ + models.VLANNetworkData('oam', **self.VLAN_DATA), + models.VLANNetworkData('oob', **self.VLAN_DATA), + models.VLANNetworkData('pxe', **self.VLAN_DATA) + ] + + def test___init__(self): + """Tests basic initialization of a Network object""" + result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA) + self.assertEqual( + set(self.vlan_network_data), set(result.vlan_network_data)) + self.assertDictEqual(self.BGP_DATA, result.bgp) + + def test_dict_from_class(self): + """Tests production of a dictionary from a Network object""" + expected_result = { + 'bgp': self.BGP_DATA, + 'vlan_network_data': { + 'oam': { + **self.VLAN_DATA + }, + 'oob': { + **self.VLAN_DATA + }, + 'pxe': { + **self.VLAN_DATA + }, + } + } + result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA) + self.assertDictEqual(expected_result, result.dict_from_class()) + + def test_merge_additional_data_bgp(self): + """Tests merging additional BGP data + + BGP data is often set after the initial generation of data objects. + """ + result = models.Network(self.vlan_network_data) + self.assertEqual({}, result.bgp) + result.merge_additional_data({'bgp': self.BGP_DATA}) + self.assertEqual(self.BGP_DATA, result.bgp) + + def test_merge_additional_data_new_vlan_data(self): + """Tests merging of data containing data for a new VLANNetworkData obj + + If a new set of VLANNetworkData is introduced in additional data, + Network should create a new VLANNetworkData object for its list. + """ + result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA) + self.assertIsNone(result.get_vlan_data_by_name('calico')) + new_calico_vlan = {'vlan_network_data': {'calico': {**self.VLAN_DATA}}} + result.merge_additional_data(new_calico_vlan) + self.assertIsNotNone(result.get_vlan_data_by_name('calico')) + + def test_merge_additional_data_new_data_for_existing_vlan(self): + """Tests merging of data for an existing VLANNetworkData object + + If a new set of data for an existing VLANNetworkData role is given by + additional data, Network should merge this new data into the existing + VLANNetworkData object. + """ + result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA) + self.assertEqual( + self.VLAN_DATA['vlan'], + result.get_vlan_data_by_name('oob').vlan) + new_vlan_data = {'vlan_network_data': {'oob': {'vlan': '12'}}} + result.merge_additional_data(new_vlan_data) + self.assertEqual( + new_vlan_data['vlan_network_data']['oob']['vlan'], + result.get_vlan_data_by_name('oob').vlan) + + def test_get_vlan_data_by_name(self): + """Tests retrieval of VLANNetworkData by name attribute""" + result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA) + self.assertEqual( + self.vlan_network_data[1], result.get_vlan_data_by_name('oob')) + + def test_get_vlan_data_by_name_dne(self): + """Tests retrieval of nonexistent name VLANNetworkData""" + result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA) + self.assertIsNone(result.get_vlan_data_by_name('calico')) + + def test_get_vlan_data_by_role(self): + """Tests retrieval of VLANNetworkData by role attribute""" + result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA) + self.assertEqual( + self.vlan_network_data[1], result.get_vlan_data_by_role('oob')) + + def test_get_vlan_data_by_role_dne(self): + """Tests retrieval of nonexistent role VLANNetworkData""" + result = models.Network(self.vlan_network_data, bgp=self.BGP_DATA) + self.assertIsNone(result.get_vlan_data_by_role('calico')) + + +class TestSiteInfo(unittest.TestCase): + """Tests for the SiteInfo model""" + + SITE_NAME = 'Test Site' + SITE_INFO = { + 'physical_location_id': 12345, + 'state': 'MO', + 'country': 'USA', + 'corridor': 'C1', + 'sitetype': 'test', + 'dns': ['210.27.143.213', '127.13.31.192'], + 'ntp': ['29.190.93.106', '252.240.25.174'], + 'domain': 'example.com', + 'ldap': { + 'common_name': 'test', + 'domain': 'example', + 'subdomain': 'test', + 'url': 'ldap://ldap.example.com' + } + } + + def test___init__(self): + """Tests basic initialization of SiteInfo""" + result = models.SiteInfo(self.SITE_NAME, **self.SITE_INFO) + self.assertEqual(self.SITE_NAME, result.name) + self.assertEqual( + self.SITE_INFO['physical_location_id'], + result.physical_location_id) + self.assertEqual(self.SITE_INFO['state'], result.state) + self.assertEqual(self.SITE_INFO['country'], result.country) + self.assertEqual(self.SITE_INFO['corridor'], result.corridor) + self.assertEqual(self.SITE_INFO['sitetype'], result.sitetype) + self.assertEqual(','.join(self.SITE_INFO['dns']), str(result.dns)) + self.assertEqual(','.join(self.SITE_INFO['ntp']), str(result.ntp)) + self.assertEqual(self.SITE_INFO['domain'], result.domain) + self.assertDictEqual(self.SITE_INFO['ldap'], result.ldap) + + def test___init___missing_data(self): + """Tests initailization of SiteInfo with missing data + + If data is not given for SiteInfo attributes, the attributes should + be set to the value given by models.DATA_DEFAULT, an empty list, or + an empty dictionary. + """ + result = models.SiteInfo(self.SITE_NAME) + self.assertEqual(self.SITE_NAME, result.name) + self.assertEqual(models.DATA_DEFAULT, result.physical_location_id) + self.assertEqual(models.DATA_DEFAULT, result.state) + self.assertEqual(models.DATA_DEFAULT, result.country) + self.assertEqual(models.DATA_DEFAULT, result.corridor) + self.assertEqual(models.DATA_DEFAULT, result.sitetype) + self.assertEqual(','.join([]), str(result.dns)) + self.assertEqual(','.join([]), str(result.ntp)) + self.assertEqual(models.DATA_DEFAULT, result.domain) + self.assertDictEqual({}, result.ldap) + + def test_dict_from_class(self): + """Tests production of a dictionary from a SiteInfo object""" + expected_results = copy(self.SITE_INFO) + expected_results['dns'] = ','.join(expected_results['dns']) + expected_results['ntp'] = ','.join(expected_results['ntp']) + expected_results['name'] = self.SITE_NAME + result = models.SiteInfo(self.SITE_NAME, **self.SITE_INFO) + self.assertDictEqual(expected_results, result.dict_from_class()) + + def test_merge_additional_data(self): + """Tests merging of additional data into SiteInfo""" + result = models.SiteInfo(self.SITE_NAME) + self.assertEqual(self.SITE_NAME, result.name) + self.assertEqual(models.DATA_DEFAULT, result.physical_location_id) + self.assertEqual(models.DATA_DEFAULT, result.state) + self.assertEqual(models.DATA_DEFAULT, result.country) + self.assertEqual(models.DATA_DEFAULT, result.corridor) + self.assertEqual(models.DATA_DEFAULT, result.sitetype) + self.assertEqual(','.join([]), str(result.dns)) + self.assertEqual(','.join([]), str(result.ntp)) + self.assertEqual(models.DATA_DEFAULT, result.domain) + self.assertDictEqual({}, result.ldap) + config_dict = copy(self.SITE_INFO) + config_dict['dns'] = {'servers': config_dict['dns']} + config_dict['ntp'] = {'servers': config_dict['ntp']} + config_dict['name'] = 'new_name' + result.merge_additional_data(config_dict) + self.assertEqual(config_dict['name'], result.name) + self.assertEqual( + self.SITE_INFO['physical_location_id'], + result.physical_location_id) + self.assertEqual(self.SITE_INFO['state'], result.state) + self.assertEqual(self.SITE_INFO['country'], result.country) + self.assertEqual(self.SITE_INFO['corridor'], result.corridor) + self.assertEqual(self.SITE_INFO['sitetype'], result.sitetype) + self.assertEqual(','.join(self.SITE_INFO['dns']), str(result.dns)) + self.assertEqual(','.join(self.SITE_INFO['ntp']), str(result.ntp)) + self.assertEqual(self.SITE_INFO['domain'], result.domain) + self.assertDictEqual(self.SITE_INFO['ldap'], result.ldap) + + +class TestSiteDocumentData(unittest.TestCase): + """Tests for the SiteDocumentData model""" + + STORAGE_DICT = {'ceph': {'controller': {'osd_count': 6}}} + + @mock.patch('spyglass.data_extractor.models.SiteInfo') + @mock.patch('spyglass.data_extractor.models.Network') + @mock.patch('spyglass.data_extractor.models.Rack') + def test___init__(self, Rack, Network, SiteInfo): + """Tests basic initialization of SiteDocumentData""" + site_info = SiteInfo() + network = Network() + baremetal = [Rack(), Rack(), Rack()] + result = models.SiteDocumentData( + site_info, network, baremetal, self.STORAGE_DICT) + self.assertEqual(site_info, result.site_info) + self.assertEqual(network, result.network) + self.assertEqual(set(baremetal), set(result.baremetal)) + self.assertDictEqual(self.STORAGE_DICT, result.storage) + + @mock.patch('spyglass.data_extractor.models.SiteInfo') + @mock.patch('spyglass.data_extractor.models.Network') + @mock.patch('spyglass.data_extractor.models.Rack') + def test_dict_from_class(self, Rack, Network, SiteInfo): + """Tests production of a dictionary from a SiteDocumentData object""" + mock_site_info_data = {'name': 'test', 'country': 'USA'} + mock_network_data = { + 'bgp': 'bgp_data', + 'vlan_network_data': 'vlan_data' + } + mock_baremetal0_data = {'rack1': {'host1': 'data'}} + mock_baremetal1_data = {'rack2': {'host2': 'data'}} + + site_info = SiteInfo() + site_info.dict_from_class.return_value = mock_site_info_data + network = Network() + network.dict_from_class.return_value = mock_network_data + baremetal = [Rack(), Rack()] + Rack().dict_from_class.side_effect = \ + [mock_baremetal0_data, mock_baremetal1_data] + + expected_result = { + 'baremetal': { + **mock_baremetal0_data, + **mock_baremetal1_data + }, + 'network': mock_network_data, + 'site_info': mock_site_info_data, + 'storage': self.STORAGE_DICT + } + + result = models.SiteDocumentData( + site_info, network, baremetal, self.STORAGE_DICT) + self.assertDictEqual(expected_result, result.dict_from_class()) + + @mock.patch('spyglass.data_extractor.models.SiteInfo') + @mock.patch('spyglass.data_extractor.models.Network') + @mock.patch('spyglass.data_extractor.models.Rack') + def test_merge_additional_data_storage(self, Rack, Network, SiteInfo): + """Tests merging of storage data dictionary + + Storage data is often given after initialization of data objects. + """ + site_info = SiteInfo() + network = Network() + baremetal = [Rack(), Rack(), Rack()] + result = models.SiteDocumentData(site_info, network, baremetal) + self.assertIsNone(result.storage) + result.merge_additional_data({'storage': self.STORAGE_DICT}) + self.assertDictEqual(self.STORAGE_DICT, result.storage) + + @mock.patch('spyglass.data_extractor.models.SiteInfo') + @mock.patch('spyglass.data_extractor.models.Network') + @mock.patch('spyglass.data_extractor.models.Rack') + def test_get_baremetal_rack_by_name(self, Rack, Network, SiteInfo): + """Tests retrieval of baremetal rack by name""" + site_info = SiteInfo() + network = Network() + baremetal = [Rack(), Rack(), Rack()] + type(Rack()).name = mock.PropertyMock( + side_effect=['rack1', 'rack2', 'rack3']) + result = models.SiteDocumentData(site_info, network, baremetal) + self.assertIsNotNone(result.get_baremetal_rack_by_name('rack2')) + + @mock.patch('spyglass.data_extractor.models.SiteInfo') + @mock.patch('spyglass.data_extractor.models.Network') + @mock.patch('spyglass.data_extractor.models.Rack') + def test_get_baremetal_rack_by_name_dne(self, Rack, Network, SiteInfo): + """Tests retrieval of nonexistent baremetal rack by name""" + site_info = SiteInfo() + network = Network() + baremetal = [Rack(), Rack()] + type(Rack()).name = mock.PropertyMock(side_effect=['rack1', 'rack3']) + result = models.SiteDocumentData(site_info, network, baremetal) + self.assertIsNone(result.get_baremetal_rack_by_name('rack2')) diff --git a/tox.ini b/tox.ini index 54ed7be..60481e6 100644 --- a/tox.ini +++ b/tox.ini @@ -72,6 +72,6 @@ deps = commands = bash -c 'PATH=$PATH:~/.local/bin; pytest --cov=spyglass --cov-report \ html:cover --cov-report xml:cover/coverage.xml --cov-report term \ - --cov-fail-under 50 tests/' + --cov-fail-under 60 tests/' whitelist_externals = bash