Add tests for parsing engine

Increases test coverage of the Spyglass parser engine from 0% to 82%,
bringing overall test coverage to 86.4%. This change moves minimum
coverage to 84%.

Change-Id: I2de496b8d7f4c4252be22c713605fae6fd565b66
This commit is contained in:
Ian H Pittwood 2019-07-16 16:18:32 -05:00 committed by Ian Pittwood
parent 4dd1cea32a
commit b7c2bc7ccd
8 changed files with 333 additions and 1 deletions

View File

@ -120,6 +120,7 @@ class IPList(object):
}.items() }.items()
def set_ip_by_role(self, role: str, new_value): def set_ip_by_role(self, role: str, new_value):
# TODO(ian-pittwood): use setattr here?
if role == 'oob': if role == 'oob':
self.oob = _parse_ip(new_value) self.oob = _parse_ip(new_value)
elif role == 'oam': elif role == 'oam':

View File

@ -41,6 +41,7 @@ class ProcessDataSource(object):
return raw_data return raw_data
def _initialize_intermediary(self): def _initialize_intermediary(self):
# TODO(ian-pittwood): Define these in init, remove this function
self.host_type = {} self.host_type = {}
self.data = { self.data = {
"network": {}, "network": {},
@ -73,6 +74,8 @@ class ProcessDataSource(object):
return network_subnets return network_subnets
def _get_genesis_node_details(self): def _get_genesis_node_details(self):
# TODO(ian-pittwood): Use get_baremetal_host_by_type instead
# TODO(ian-pittwood): Below should be docstring, not comment
# Get genesis host node details from the hosts based on host type # Get genesis host node details from the hosts based on host type
for rack in self.data.baremetal: for rack in self.data.baremetal:
for host in rack.hosts: for host in rack.hosts:
@ -90,6 +93,7 @@ class ProcessDataSource(object):
The method validates this with regex pattern defined for each The method validates this with regex pattern defined for each
data type. data type.
""" """
# TODO(ian-pittwood): Implement intermediary validation or remove
LOG.info("Validating Intermediary data") LOG.info("Validating Intermediary data")
# Performing a deep copy # Performing a deep copy
@ -139,6 +143,12 @@ class ProcessDataSource(object):
""" """
LOG.info("Apply design rules") LOG.info("Apply design rules")
# TODO(ian-pittwood): Use more robust path creation methods such
# as os.path.join. We may also want to let
# users specify these in cli opts. We also need
# better guidelines over how to write these rules
# and how they are applied.
rules_dir = resource_filename("spyglass", "config/") rules_dir = resource_filename("spyglass", "config/")
rules_file = rules_dir + "rules.yaml" rules_file = rules_dir + "rules.yaml"
rules_data_raw = self._read_file(rules_file) rules_data_raw = self._read_file(rules_file)
@ -158,6 +168,10 @@ class ProcessDataSource(object):
# information is already present in plugin data. # information is already present in plugin data.
# This function shall be defined if plugin data source # This function shall be defined if plugin data source
# doesn't provide host profile information. # doesn't provide host profile information.
# TODO(ian-pittwood): Should be implemented as it is outside of
# our plugin packages. Logic can be implemented
# to ensure proper data processing.
pass pass
def _apply_rule_hardware_profile(self, rule_data): def _apply_rule_hardware_profile(self, rule_data):
@ -212,6 +226,7 @@ class ProcessDataSource(object):
host_idx = 0 host_idx = 0
LOG.info("Update baremetal host ip's") LOG.info("Update baremetal host ip's")
# TODO(ian-pittwood): this can be redone to be cleaner with models
for rack in self.data.baremetal: for rack in self.data.baremetal:
for host in rack.hosts: for host in rack.hosts:
for net_type, net_ip in iter(host.ip): for net_type, net_ip in iter(host.ip):
@ -219,6 +234,7 @@ class ProcessDataSource(object):
host.ip.set_ip_by_role( host.ip.set_ip_by_role(
net_type, str(ips[host_idx + default_ip_offset])) net_type, str(ips[host_idx + default_ip_offset]))
host_idx += 1 host_idx += 1
return
def _update_vlan_net_data(self, rule_data): def _update_vlan_net_data(self, rule_data):
"""Offset allocation rules to determine ip address range(s) """Offset allocation rules to determine ip address range(s)
@ -296,6 +312,8 @@ class ProcessDataSource(object):
pprint.pformat(vlan_network_data_.dict_from_class()))) pprint.pformat(vlan_network_data_.dict_from_class())))
def load_extracted_data_from_data_source(self, extracted_data): def load_extracted_data_from_data_source(self, extracted_data):
# TODO(ian-pittwood): Remove this and use init
"""Function called from cli.py to pass extracted data """Function called from cli.py to pass extracted data
from input data source from input data source

35
tests/conftest.py Normal file
View File

@ -0,0 +1,35 @@
# Copyright 2019 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import pytest
import yaml
from spyglass.data_extractor.models import site_document_data_factory
FIXTURE_DIR = os.path.join(os.path.dirname(__file__), 'shared')
@pytest.fixture(scope='class')
def site_document_data_objects(request):
with open(os.path.join(FIXTURE_DIR, 'test_intermediary.yaml'), 'r') as f:
yaml_data = yaml.safe_load(f)
request.cls.site_document_data = site_document_data_factory(yaml_data)
@pytest.fixture(scope='class')
def rules_data(request):
with open(os.path.join(FIXTURE_DIR, 'rules.yaml'), 'r') as f:
request.cls.rules_data = yaml.safe_load(f)

38
tests/shared/rules.yaml Normal file
View File

@ -0,0 +1,38 @@
###########################
# Global Rules #
###########################
#Rule1: ip_alloc_offset
# Specifies the number of ip addresses to offset from
# the start of subnet allocation pool while allocating it to host.
# -for vlan it is set to 12 as default.
# -for oob it is 10
# -for all gateway ip addresss it is set to 1.
# -for ingress vip it is 1
# -for static end (non pxe) it is -1( means one but last ip of the pool)
# -for dhcp end (pxe only) it is -2( 3rd from the last ip of the pool)
#Rule2: host_profile_interfaces.
# Specifies the network interfaces type and
# and their names for a particular hw profile
#Rule3: hardware_profile
# This specifies the profile details bases on sitetype.
# It specifies the profile name and host type for compute,
# controller along with hw type
---
rule_ip_alloc_offset:
name: ip_alloc_offset
ip_alloc_offset:
default: 12
oob: 10
gateway: 1
ingress_vip: 1
static_ip_end: -2
dhcp_ip_end: -2
rule_hardware_profile:
name: hardware_profile
hardware_profile:
foundry:
profile_name:
compute: dp-r720
ctrl: cp-r720
hw_type: dell_r720
...

View File

View File

View File

@ -0,0 +1,240 @@
# Copyright 2019 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
from unittest import mock
from netaddr import IPNetwork
from pytest import mark
from spyglass.parser.engine import ProcessDataSource
FIXTURE_DIR = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'shared')
@mark.usefixtures('tmpdir')
@mark.usefixtures('site_document_data_objects')
@mark.usefixtures('rules_data')
class TestProcessDataSource(unittest.TestCase):
REGION_NAME = 'test'
def test___init__(self):
with mock.patch.object(
ProcessDataSource,
'_initialize_intermediary') as mock__initialize_intermediary:
obj = ProcessDataSource(self.REGION_NAME)
self.assertEqual(self.REGION_NAME, obj.region_name)
mock__initialize_intermediary.assert_called_once()
def test__read_file(self):
test_file = os.path.join(FIXTURE_DIR, 'site_config.yaml')
with open(test_file, 'r') as f:
expected_data = f.read()
data = ProcessDataSource._read_file(test_file)
self.assertEqual(expected_data, data)
def test__initialize_intermediary(self):
expected_data = {
"network": {},
"baremetal": {},
"region_name": "",
"storage": {},
"site_info": {},
}
obj = ProcessDataSource(self.REGION_NAME)
obj._initialize_intermediary()
self.assertDictEqual({}, obj.host_type)
self.assertDictEqual(expected_data, obj.data)
self.assertIsNone(obj.sitetype)
self.assertIsNone(obj.genesis_node)
self.assertIsNone(obj.region_name)
self.assertIsNone(obj.network_subnets)
def test__get_network_subnets(self):
expected_result = {
'calico': IPNetwork('30.29.1.0/25'),
'oam': IPNetwork('10.0.220.0/26'),
'oob': IPNetwork('10.0.220.128/27'),
'overlay': IPNetwork('30.19.0.0/25'),
'pxe': IPNetwork('30.30.4.0/25'),
'storage': IPNetwork('30.31.1.0/25')
}
obj = ProcessDataSource(self.REGION_NAME)
obj.load_extracted_data_from_data_source(self.site_document_data)
result = obj._get_network_subnets()
self.assertDictEqual(expected_result, result)
def test__get_genesis_node_details(self):
expected_result = self.site_document_data.get_baremetal_host_by_type(
'genesis')[0]
obj = ProcessDataSource(self.REGION_NAME)
obj.load_extracted_data_from_data_source(self.site_document_data)
obj._get_genesis_node_details()
self.assertEqual(expected_result, obj.genesis_node)
@unittest.skip('Not in use.')
def test__validate_intermediary_data(self):
pass
@mock.patch.object(ProcessDataSource, '_apply_rule_ip_alloc_offset')
@mock.patch.object(ProcessDataSource, '_apply_rule_hardware_profile')
def test__apply_design_rules(
self, mock_rule_hw_profile, mock_rule_ip_alloc_offset):
obj = ProcessDataSource(self.REGION_NAME)
obj.load_extracted_data_from_data_source(self.site_document_data)
obj._apply_design_rules()
mock_rule_hw_profile.assert_called_once()
mock_rule_ip_alloc_offset.assert_called_once()
@unittest.skip('Not implemented.')
def test__apply_rule_host_profiles_interfaces(self):
pass
def test__apply_rule_hardware_profile(self):
input_rules = self.rules_data['rule_hardware_profile'][
'hardware_profile']
obj = ProcessDataSource(self.REGION_NAME)
obj.load_extracted_data_from_data_source(self.site_document_data)
obj._apply_rule_hardware_profile(input_rules)
self.assertEqual(
1, len(obj.data.get_baremetal_host_by_type('genesis')))
self.assertEqual(
3, len(obj.data.get_baremetal_host_by_type('controller')))
self.assertEqual(
8, len(obj.data.get_baremetal_host_by_type('compute')))
for host in obj.data.get_baremetal_host_by_type('genesis'):
self.assertEqual('cp-r720', host.host_profile)
for host in obj.data.get_baremetal_host_by_type('controller'):
self.assertEqual('cp-r720', host.host_profile)
for host in obj.data.get_baremetal_host_by_type('compute'):
self.assertEqual('dp-r720', host.host_profile)
@mock.patch.object(ProcessDataSource, '_update_baremetal_host_ip_data')
@mock.patch.object(ProcessDataSource, '_update_vlan_net_data')
@mock.patch.object(
ProcessDataSource, '_get_network_subnets', return_value='success')
def test__apply_rule_ip_alloc_offset(
self, mock__get_network_subnets, mock__update_vlan_net_data,
mock__update_baremetal_host_ip_data):
obj = ProcessDataSource(self.REGION_NAME)
obj.load_extracted_data_from_data_source(self.site_document_data)
obj._apply_rule_ip_alloc_offset(self.rules_data)
self.assertEqual('success', obj.network_subnets)
mock__get_network_subnets.assert_called_once()
mock__update_vlan_net_data.assert_called_once_with(self.rules_data)
mock__update_baremetal_host_ip_data.assert_called_once_with(
self.rules_data)
def test__update_baremetal_host_ip_data(self):
obj = ProcessDataSource(self.REGION_NAME)
obj.load_extracted_data_from_data_source(self.site_document_data)
obj.network_subnets = obj._get_network_subnets()
ip_alloc_offset_rules = self.rules_data['rule_ip_alloc_offset'][
'ip_alloc_offset']
obj._update_baremetal_host_ip_data(ip_alloc_offset_rules)
counter = 0
for rack in obj.data.baremetal:
for host in rack.hosts:
for net_type, net_ip in iter(host.ip):
ips = list(obj.network_subnets[net_type])
self.assertEqual(
str(ips[counter + ip_alloc_offset_rules['default']]),
net_ip)
counter += 1
def test__update_vlan_net_data(self):
ip_alloc_offset_rules = self.rules_data['rule_ip_alloc_offset'][
'ip_alloc_offset']
obj = ProcessDataSource(self.REGION_NAME)
obj.load_extracted_data_from_data_source(self.site_document_data)
obj.network_subnets = obj._get_network_subnets()
obj._update_vlan_net_data(ip_alloc_offset_rules)
ingress_data = obj.data.network.get_vlan_data_by_name('ingress')
subnet = IPNetwork(ingress_data.subnet[0])
ips = list(subnet)
self.assertEqual(
str(ips[ip_alloc_offset_rules['ingress_vip']]),
obj.data.network.bgp['ingress_vip'])
self.assertEqual(
ingress_data.subnet[0],
obj.data.network.bgp['public_service_cidr'])
subnets = obj.network_subnets
for vlan in self.site_document_data.network.vlan_network_data:
if vlan.role == 'ingress':
continue
ips = list(subnets[vlan.role])
self.assertEqual(
str(ips[ip_alloc_offset_rules['gateway']]), vlan.gateway)
if vlan.role == 'oob':
ip_offset = ip_alloc_offset_rules['oob']
else:
ip_offset = ip_alloc_offset_rules['default']
self.assertEqual(str(ips[1]), vlan.reserved_start)
self.assertEqual(str(ips[ip_offset]), vlan.reserved_end)
self.assertEqual(str(ips[ip_offset + 1]), vlan.static_start)
if vlan.role == 'pxe':
self.assertEqual(
str(ips[(len(ips) // 2) - 1]), vlan.static_end)
self.assertEqual(str(ips[len(ips) // 2]), vlan.dhcp_start)
self.assertEqual(
str(ips[ip_alloc_offset_rules['dhcp_ip_end']]),
vlan.dhcp_end)
else:
self.assertEqual(
str(ips[ip_alloc_offset_rules['static_ip_end']]),
vlan.static_end)
if vlan.role == 'oam':
self.assertEqual(['0.0.0.0/0'], vlan.routes)
else:
self.assertEqual([], vlan.routes)
def test_load_extracted_data_from_data_source(self):
obj = ProcessDataSource(self.REGION_NAME)
obj.load_extracted_data_from_data_source(self.site_document_data)
self.assertEqual(self.site_document_data, obj.data)
@mock.patch('yaml.dump', return_value='success')
def test_dump_intermediary_file(self, mock_dump):
obj = ProcessDataSource(self.REGION_NAME)
obj.load_extracted_data_from_data_source(self.site_document_data)
mock_open = mock.mock_open()
with mock.patch('spyglass.parser.engine.open', mock_open):
obj.dump_intermediary_file(None)
mock_dump.assert_called_once_with(
self.site_document_data.dict_from_class(),
default_flow_style=False)
mock_open.return_value.write.assert_called_once()
mock_open.return_value.close.assert_called_once()
@mock.patch.object(ProcessDataSource, '_apply_design_rules')
@mock.patch.object(ProcessDataSource, '_get_genesis_node_details')
def test_generate_intermediary_yaml(
self, mock__apply_design_rules, mock__get_genesis_node_details):
obj = ProcessDataSource(self.REGION_NAME)
obj.load_extracted_data_from_data_source(self.site_document_data)
result = obj.generate_intermediary_yaml()
self.assertEqual(self.site_document_data, result)
mock__apply_design_rules.assert_called_once()
mock__get_genesis_node_details.assert_called_once()

View File

@ -89,6 +89,6 @@ commands =
pipenv install --dev pipenv install --dev
bash -c 'PATH=$PATH:~/.local/bin; pytest --cov=spyglass \ bash -c 'PATH=$PATH:~/.local/bin; pytest --cov=spyglass \
--cov-report html:cover --cov-report xml:cover/coverage.xml \ --cov-report html:cover --cov-report xml:cover/coverage.xml \
--cov-report term --cov-fail-under 60 tests/' --cov-report term --cov-fail-under 84 tests/'
whitelist_externals = whitelist_externals =
bash bash