bug(tests): Unskip Armada API unit tests

This PS sets the foundation for unskipping Armada API unit tests
by making necessary framework changes where necessary as well
as test refactoring so that the tests not only can be unskipped
but also execute successfully.

However, until a feature-rich testing framework is in place, it's
not possible to do end-to-end testing as mocking is currently used
to stub out Tiller API.

Negative RBAC tests will be added in a follow-up.

Included in this PS:
  - policy fixture for asserting that expected policies are enforced
  - unit tests for tiller/armada API
  - test_utils module

Change-Id: I2f454b27b014875bc35fd706f7c0d05364ce562a
This commit is contained in:
Felipe Monteiro 2017-12-05 21:16:11 +00:00
parent 9318c0cf88
commit 4b3d843f04
19 changed files with 677 additions and 149 deletions

View File

@ -14,7 +14,6 @@
import json
import logging as log
import os
import uuid
import yaml
@ -22,19 +21,12 @@ import falcon
from oslo_config import cfg
from oslo_log import log as logging
from armada import const
from armada.conf import set_default_for_default_log_levels
CONF = cfg.CONF
class BaseResource(object):
def __init__(self):
if not (os.path.exists(const.CONFIG_PATH)):
set_default_for_default_log_levels()
logging.setup(CONF, 'armada')
self.logger = logging.getLogger(__name__)
def on_options(self, req, resp):

View File

@ -65,9 +65,7 @@ class Release(api.BaseResource):
releases = {}
for release in tiller.list_releases():
if not releases.get(release.namespace, None):
releases[release.namespace] = []
releases.setdefault(release.namespace, [])
releases[release.namespace].append(release.name)
resp.body = json.dumps({'releases': releases})

View File

@ -14,6 +14,7 @@
import falcon
from oslo_config import cfg
from oslo_policy import policy
from armada import conf
from armada.api import ArmadaRequest
@ -27,18 +28,19 @@ from armada.api.controller.health import Health
from armada.api.controller.tiller import Release
from armada.api.controller.tiller import Status
from armada.api.controller.validation import Validate
from armada.common import policy
conf.set_app_default_configs()
CONF = cfg.CONF
# Build API
def create(middleware=CONF.middleware):
def create(enable_middleware=CONF.middleware):
"""Entry point for intializing Armada server.
policy.setup_policy()
:param enable_middleware: Whether to enable middleware.
:type enable_middleware: bool
"""
if middleware:
if enable_middleware:
api = falcon.API(
request_type=ArmadaRequest,
middleware=[
@ -63,11 +65,14 @@ def create(middleware=CONF.middleware):
for route, service in url_routes_v1:
api.add_route("/api/v1.0/{}".format(route), service)
# Initialize policy config options.
policy.Enforcer(CONF)
return api
def paste_start_armada(global_conf, **kwargs):
# At this time just ignore everything in the paste configuration
# At this time just ignore everything in the paste configuration
# and rely on olso_config
return api

View File

@ -15,14 +15,12 @@
from oslo_config import cfg
from oslo_log import log as logging
from armada import conf
conf.set_app_default_configs()
conf.set_default_for_default_log_levels()
CONF = cfg.CONF
LOG = logging.getLogger(__name__)

View File

@ -23,6 +23,13 @@ CONF = cfg.CONF
_ENFORCER = None
def reset_policy():
global _ENFORCER
if _ENFORCER:
_ENFORCER.clear()
_ENFORCER = None
def setup_policy():
global _ENFORCER
if not _ENFORCER:
@ -30,7 +37,7 @@ def setup_policy():
register_rules(_ENFORCER)
def enforce_policy(action, target, credentials, do_raise=True):
def _enforce_policy(action, target, credentials, do_raise=True):
extras = {}
if do_raise:
extras.update(exc=exc.ActionForbidden, do_raise=do_raise)
@ -46,7 +53,7 @@ def enforce(rule):
@functools.wraps(func)
def handler(*args, **kwargs):
context = args[1].context
enforce_policy(rule, {}, context, do_raise=True)
_enforce_policy(rule, {}, context, do_raise=True)
return func(*args, **kwargs)
return handler

View File

@ -1,4 +1,4 @@
# Copyright 2017 The Armada Authors.
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -22,12 +22,24 @@ from armada import const
CONF = cfg.CONF
# Load config file if exists
if (os.path.exists(const.CONFIG_PATH)):
CONF(['--config-file', const.CONFIG_PATH])
CONFIG_FILES = ['api-paste.ini', 'armada.conf']
def _get_config_files(env=None):
if env is None:
env = os.environ
dirname = env.get('OS_ARMADA_CONFIG_DIR', const.CONFIG_PATH).strip()
config_files = [
os.path.join(dirname, config_file) for config_file in CONFIG_FILES
]
return config_files
def set_app_default_configs():
config_files = _get_config_files()
if all([os.path.exists(x) for x in config_files]):
CONF([], project='armada', default_config_files=config_files)
set_default_for_default_log_levels()
default.register_opts(CONF)
@ -43,7 +55,6 @@ def set_default_for_default_log_levels():
'kubernetes.client.rest=INFO'
]
log.register_options(CONF)
log.set_defaults(
default_log_levels=log.get_default_log_levels() +
extra_log_level_defaults)

View File

@ -12,12 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from keystoneauth1 import loading
from oslo_config import cfg
from keystoneauth1 import loading
from armada.conf import utils
default_options = [
cfg.ListOpt(
@ -44,7 +44,7 @@ Absolute path to the certificate file to use for chart registries
cfg.BoolOpt(
'middleware',
default='true',
default=True,
help=utils.fmt("""
Enables or disables Keystone authentication middleware.
""")),

View File

@ -30,4 +30,4 @@ STATUS_DEPLOYED = 'DEPLOYED'
STATUS_FAILED = 'FAILED'
# Configuration File
CONFIG_PATH = '/etc/armada/armada.conf'
CONFIG_PATH = '/etc/armada'

View File

@ -59,7 +59,6 @@ class Armada(object):
Initialize the Armada Engine and establish
a connection to Tiller
'''
self.disable_update_pre = disable_update_pre
self.disable_update_post = disable_update_post
self.enable_chart_cleanup = enable_chart_cleanup

View File

@ -0,0 +1,88 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2017 AT&T Intellectual Property.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import random
import string
import uuid
def rand_uuid_hex():
"""Generate a random UUID hex string
:return: a random UUID (e.g. '0b98cf96d90447bda4b46f31aeb1508c')
:rtype: string
"""
return uuid.uuid4().hex
def rand_name(name='', prefix='armada'):
"""Generate a random name that includes a random number
:param str name: The name that you want to include
:param str prefix: The prefix that you want to include
:return: a random name. The format is
'<prefix>-<name>-<random number>'.
(e.g. 'prefixfoo-namebar-154876201')
:rtype: string
"""
randbits = str(random.randint(1, 0x7fffffff))
rand_name = randbits
if name:
rand_name = name + '-' + rand_name
if prefix:
rand_name = prefix + '-' + rand_name
return rand_name
def rand_bool():
"""Generate a random boolean value.
:return: a random boolean value.
:rtype: boolean
"""
return random.choice([True, False])
def rand_int(min, max):
"""Generate a random integer value between range (`min`, `max`).
:return: a random integer between the range(`min`, `max`).
:rtype: integer
"""
return random.randint(min, max)
def rand_password(length=15):
"""Generate a random password
:param int length: The length of password that you expect to set
(If it's smaller than 3, it's same as 3.)
:return: a random password. The format is
'<random upper letter>-<random number>-<random special character>
-<random ascii letters or digit characters or special symbols>'
(e.g. 'G2*ac8&lKFFgh%2')
:rtype: string
"""
upper = random.choice(string.ascii_uppercase)
ascii_char = string.ascii_letters
digits = string.digits
digit = random.choice(string.digits)
puncs = '~!@#%^&*_=+'
punc = random.choice(puncs)
seed = ascii_char + digits + puncs
pre = upper + digit + punc
password = pre + ''.join(random.choice(seed) for x in range(length - 3))
return password

View File

@ -0,0 +1,50 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import os
from falcon import testing as falcon_testing
import mock
import armada.conf
from armada.tests.unit import base as test_base
from armada.tests.unit import fixtures
class BaseControllerTest(test_base.ArmadaTestCase):
"""Base class for unit testing falcon controllers."""
def setUp(self):
super(BaseControllerTest, self).setUp()
# Override the default configuration file lookup with references to
# the sample configuration files to avoid oslo.conf errors when
# creating the server below.
current_dir = os.path.dirname(os.path.realpath(__file__))
sample_conf_dir = os.path.join(current_dir, os.pardir, os.pardir,
os.pardir, os.pardir, 'etc', 'armada')
sample_conf_files = ['api-paste.ini', 'armada.conf.sample']
with mock.patch.object(
armada.conf, '_get_config_files') as mock_get_config_files:
mock_get_config_files.return_value = [
os.path.join(sample_conf_dir, x) for x in sample_conf_files
]
# FIXME(fmontei): Workaround for the fact that `armada.api` always
# calls `create()` via `api = create()` at the bottom of the module
# which invokes oslo.conf functionality that has yet to be set up
# properly in this module.
server = importlib.import_module('armada.api.server')
self.app = falcon_testing.TestClient(
server.create(enable_middleware=False))
self.policy = self.useFixture(fixtures.RealPolicyFixture())

View File

@ -1,114 +0,0 @@
# Copyright 2017 The Armada Authors.
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import mock
import unittest
import falcon
from falcon import testing
from oslo_config import cfg
from armada import conf
from armada.api import server
CONF = cfg.CONF
conf.set_app_default_configs()
class APITestCase(testing.TestCase):
def setUp(self):
super(APITestCase, self).setUp()
self.app = server.create(middleware=False)
class TestAPI(APITestCase):
@unittest.skip('this is incorrectly tested')
@mock.patch('armada.api.armada_controller.Handler')
def test_armada_apply(self, mock_armada):
'''
Test /api/v1.0/apply endpoint
'''
mock_armada.sync.return_value = None
body = json.dumps({'file': '',
'options': {'debug': 'true',
'disable_update_pre': 'false',
'disable_update_post': 'false',
'enable_chart_cleanup': 'false',
'skip_pre_flight': 'false',
'dry_run': 'false',
'wait': 'false',
'timeout': '100'}})
doc = {u'message': u'Success'}
result = self.simulate_post(path='/api/v1.0/apply', body=body)
self.assertEqual(result.json, doc)
@unittest.skip('Test does not handle auth/policy correctly')
@mock.patch('armada.api.tiller_controller.Tiller')
def test_tiller_status(self, mock_tiller):
'''
Test /status endpoint
Test /api/v1.0/status endpoint
'''
# Mock tiller status value
mock_tiller.tiller_status.return_value = 'Active'
# FIXME(lamt) This variable is unused. Uncomment when it is.
# doc = {u'message': u'Tiller Server is Active'}
result = self.simulate_get('/api/v1.0/status')
# TODO(lamt) This should be HTTP_401 if no auth is happening, but auth
# is not implemented currently, so it falls back to a policy check
# failure, thus a 403. Change this once it is completed
# Fails due to invalid access
self.assertEqual(falcon.HTTP_403, result.status)
# FIXME(lamt) Need authentication - mock, fixture
# self.assertEqual(result.json, doc)
@unittest.skip('Test does not handle auth/policy correctly')
@mock.patch('armada.api.tiller_controller.Tiller')
def test_tiller_releases(self, mock_tiller):
'''
Test /api/v1.0/releases endpoint
'''
# Mock tiller status value
mock_tiller.list_releases.return_value = None
# FIXME(lamt) This variable is unused. Uncomment when it is.
# doc = {u'releases': {}}
result = self.simulate_get('/api/v1.0/releases')
# TODO(lamt) This should be HTTP_401 if no auth is happening, but auth
# is not implemented currently, so it falls back to a policy check
# failure, thus a 403. Change this once it is completed
self.assertEqual(falcon.HTTP_403, result.status)
# FIXME(lamt) Need authentication - mock, fixture
# self.assertEqual(result.json, doc)
def test_health_endpoint(self):
"""
Validate that /api/v1.0/health returns 204.
"""
result = self.simulate_get('/api/v1.0/health')
self.assertEqual(result.status, falcon.HTTP_204)

View File

@ -0,0 +1,58 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import mock
from oslo_config import cfg
from armada.handlers import armada
from armada.tests.unit.api import base
CONF = cfg.CONF
class ArmadaControllerTest(base.BaseControllerTest):
@mock.patch.object(armada, 'lint')
@mock.patch.object(armada, 'Manifest')
@mock.patch.object(armada, 'Tiller')
def test_armada_apply_resource(self, mock_tiller, mock_manifest,
mock_lint):
"""Tests the POST /api/v1.0/apply endpoint."""
rules = {'armada:create_endpoints': '@'}
self.policy.set_rules(rules)
options = {'debug': 'true',
'disable_update_pre': 'false',
'disable_update_post': 'false',
'enable_chart_cleanup': 'false',
'skip_pre_flight': 'false',
'dry_run': 'false',
'wait': 'false',
'timeout': '100'}
payload = {'file': '', 'options': options}
body = json.dumps(payload)
expected = {'message': {'diff': [], 'install': [], 'upgrade': []}}
result = self.app.simulate_post(path='/api/v1.0/apply', body=body)
self.assertEqual(result.json, expected)
self.assertEqual('application/json', result.headers['content-type'])
mock_tiller.assert_called_once_with(tiller_host=None,
tiller_port=44134)
mock_manifest.assert_called_once_with([payload])
mock_lint.validate_armada_documents.assert_called_once_with([payload])
fake_manifest = mock_manifest.return_value.get_manifest.return_value
mock_lint.validate_armada_object.assert_called_once_with(fake_manifest)

View File

@ -0,0 +1,27 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
from armada.tests.unit.api import base
class HealthControllerTest(base.BaseControllerTest):
def test_get_health_status(self):
"""
Validate that /api/v1.0/health returns 204.
"""
result = self.app.simulate_get('/api/v1.0/health')
self.assertEqual(result.status, falcon.HTTP_204)

View File

@ -0,0 +1,117 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo_config import cfg
from armada.api.controller import tiller as tiller_controller
from armada.tests.unit.api import base
CONF = cfg.CONF
class TillerControllerTest(base.BaseControllerTest):
@mock.patch.object(tiller_controller, 'Tiller')
def test_get_tiller_status(self, mock_tiller):
"""Tests GET /api/v1.0/status endpoint."""
rules = {'tiller:get_status': '@'}
self.policy.set_rules(rules)
mock_tiller.return_value.tiller_status.return_value = 'fake_status'
mock_tiller.return_value.tiller_version.return_value = 'fake_verson'
result = self.app.simulate_get('/api/v1.0/status')
expected = {
'tiller': {'version': 'fake_verson', 'state': 'fake_status'}
}
self.assertEqual(expected, result.json)
self.assertEqual('application/json', result.headers['content-type'])
mock_tiller.assert_called_once_with(tiller_host=None, tiller_port=None)
@mock.patch.object(tiller_controller, 'Tiller')
def test_get_tiller_status_with_params(self, mock_tiller):
"""Tests GET /api/v1.0/status endpoint with query parameters."""
rules = {'tiller:get_status': '@'}
self.policy.set_rules(rules)
mock_tiller.return_value.tiller_status.return_value = 'fake_status'
mock_tiller.return_value.tiller_version.return_value = 'fake_verson'
result = self.app.simulate_get('/api/v1.0/status',
params_csv=False,
params={'tiller_host': 'fake_host',
'tiller_port': '98765'})
expected = {
'tiller': {'version': 'fake_verson', 'state': 'fake_status'}
}
self.assertEqual(expected, result.json)
self.assertEqual('application/json', result.headers['content-type'])
mock_tiller.assert_called_once_with(tiller_host='fake_host',
tiller_port='98765')
@mock.patch.object(tiller_controller, 'Tiller')
def test_tiller_releases(self, mock_tiller):
"""Tests GET /api/v1.0/releases endpoint."""
rules = {'tiller:get_release': '@'}
self.policy.set_rules(rules)
def _get_fake_release(name, namespace):
fake_release = mock.Mock(namespace='%s_namespace' % namespace)
fake_release.configure_mock(name=name)
return fake_release
mock_tiller.return_value.list_releases.return_value = [
_get_fake_release('foo', 'bar'), _get_fake_release('baz', 'qux')
]
result = self.app.simulate_get('/api/v1.0/releases')
expected = {
'releases': {'bar_namespace': ['foo'], 'qux_namespace': ['baz']}
}
self.assertEqual(expected, result.json)
mock_tiller.assert_called_once_with(tiller_host=None, tiller_port=None)
mock_tiller.return_value.list_releases.assert_called_once_with()
@mock.patch.object(tiller_controller, 'Tiller')
def test_tiller_releases_with_params(self, mock_tiller):
"""Tests GET /api/v1.0/releases endpoint with query parameters."""
rules = {'tiller:get_release': '@'}
self.policy.set_rules(rules)
def _get_fake_release(name, namespace):
fake_release = mock.Mock(namespace='%s_namespace' % namespace)
fake_release.configure_mock(name=name)
return fake_release
mock_tiller.return_value.list_releases.return_value = [
_get_fake_release('foo', 'bar'), _get_fake_release('baz', 'qux')
]
result = self.app.simulate_get('/api/v1.0/releases',
params_csv=False,
params={'tiller_host': 'fake_host',
'tiller_port': '98765'})
expected = {
'releases': {'bar_namespace': ['foo'], 'qux_namespace': ['baz']}
}
self.assertEqual(expected, result.json)
mock_tiller.assert_called_once_with(tiller_host='fake_host',
tiller_port='98765')
mock_tiller.return_value.list_releases.assert_called_once_with()

70
armada/tests/unit/base.py Normal file
View File

@ -0,0 +1,70 @@
# Copyright 2013 IBM Corp.
# Copyright 2017 AT&T Intellectual Property.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
import fixtures
import mock
from oslo_config import cfg
import testtools
CONF = cfg.CONF
class ArmadaTestCase(testtools.TestCase):
def setUp(self):
super(ArmadaTestCase, self).setUp()
self.useFixture(fixtures.FakeLogger('armada'))
def override_config(self, name, override, group=None):
CONF.set_override(name, override, group)
self.addCleanup(CONF.clear_override, name, group)
def assertEmpty(self, collection):
if isinstance(collection, list):
self.assertEqual(0, len(collection))
elif isinstance(collection, dict):
self.assertEqual(0, len(collection.keys()))
def patch(self, target, autospec=True, **kwargs):
"""Returns a started `mock.patch` object for the supplied target.
The caller may then call the returned patcher to create a mock object.
The caller does not need to call stop() on the returned
patcher object, as this method automatically adds a cleanup
to the test class to stop the patcher.
:param target: String module.class or module.object expression to patch
:param **kwargs: Passed as-is to `mock.patch`. See mock documentation
for details.
"""
p = mock.patch(target, autospec=autospec, **kwargs)
m = p.start()
self.addCleanup(p.stop)
return m
def patchobject(self, target, attribute, new=mock.DEFAULT, autospec=True):
"""Convenient wrapper around `mock.patch.object`
Returns a started mock that will be automatically stopped after the
test ran.
"""
p = mock.patch.object(target, attribute, new, autospec=autospec)
m = p.start()
self.addCleanup(p.stop)
return m

View File

@ -12,12 +12,13 @@
import testtools
import mock
from oslo_policy import policy as common_policy
from armada.common import policy
from armada import conf as cfg
from armada.exceptions import base_exception as exc
import mock
from armada.tests.unit import fixtures
CONF = cfg.CONF
@ -32,6 +33,7 @@ class PolicyTestCase(testtools.TestCase):
"example:allowed": [],
"example:disallowed": [["false:false"]]
}
self.useFixture(fixtures.RealPolicyFixture(False))
self._set_rules()
self.credentials = {}
self.target = {}
@ -46,7 +48,7 @@ class PolicyTestCase(testtools.TestCase):
mock_ctx.to_policy_view.return_value = self.credentials
self.assertRaises(
exc.ActionForbidden, policy.enforce_policy, action,
exc.ActionForbidden, policy._enforce_policy, action,
self.target, mock_ctx)
@mock.patch('armada.api.ArmadaRequestContext')
@ -54,12 +56,12 @@ class PolicyTestCase(testtools.TestCase):
action = "example:allowed"
mock_ctx.to_policy_view.return_value = self.credentials
policy.enforce_policy(action, self.target, mock_ctx)
policy._enforce_policy(action, self.target, mock_ctx)
@mock.patch('armada.api.ArmadaRequestContext')
def test_enforce_bad_action(self, mock_ctx):
action = "example:disallowed"
mock_ctx.to_policy_view.return_value = self.credentials
self.assertRaises(exc.ActionForbidden, policy.enforce_policy,
self.assertRaises(exc.ActionForbidden, policy._enforce_policy,
action, self.target, mock_ctx)

View File

@ -0,0 +1,24 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
policy_data = """
"admin_required": "role:admin"
"armada:create_endpoints": "rule:admin_required"
"armada:validate_manifest": "rule:admin_required"
"armada:test_release": "rule:admin_required"
"armada:test_manifest": "rule:admin_required"
"tiller:get_status": "rule:admin_required"
"tiller:get_release": "rule:admin_required"
"""

View File

@ -0,0 +1,196 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2017 AT&T Intellectual Property.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Fixtures for Armada tests."""
from __future__ import absolute_import
import os
import yaml
import fixtures
import mock
from oslo_config import cfg
from oslo_policy import opts as policy_opts
from oslo_policy import policy as oslo_policy
from armada.common import policies
import armada.common.policy
from armada.tests.unit import fake_policy
CONF = cfg.CONF
class ConfPatcher(fixtures.Fixture):
"""Fixture to patch and restore global CONF.
This also resets overrides for everything that is patched during
it's teardown.
"""
def __init__(self, **kwargs):
"""Constructor
:params group: if specified all config options apply to that group.
:params **kwargs: the rest of the kwargs are processed as a
set of key/value pairs to be set as configuration override.
"""
super(ConfPatcher, self).__init__()
self.group = kwargs.pop('group', None)
self.args = kwargs
def setUp(self):
super(ConfPatcher, self).setUp()
for k, v in self.args.items():
self.addCleanup(CONF.clear_override, k, self.group)
CONF.set_override(k, v, self.group)
class RealPolicyFixture(fixtures.Fixture):
"""Load the live policy for tests.
A base policy fixture that starts with the assumption that you'd
like to load and enforce the shipped default policy in tests.
"""
def __init__(self, verify=True, *args, **kwargs):
"""Constructor for ``RealPolicyFixture``.
:param verify: Whether to verify that expected and actual policies
match. True by default.
"""
super(RealPolicyFixture, self).__init__(*args, **kwargs)
self.verify = verify
def _setUp(self):
super(RealPolicyFixture, self)._setUp()
self.policy_dir = self.useFixture(fixtures.TempDir())
self.policy_file = os.path.join(self.policy_dir.path,
'policy.yaml')
# Load the fake_policy data and add the missing default rules.
policy_rules = yaml.safe_load(fake_policy.policy_data)
self.add_missing_default_rules(policy_rules)
with open(self.policy_file, 'w') as f:
yaml.safe_dump(policy_rules, f)
policy_opts.set_defaults(CONF)
self.useFixture(
ConfPatcher(policy_dirs=[], policy_file=self.policy_file,
group='oslo_policy'))
armada.common.policy.reset_policy()
armada.common.policy.setup_policy()
self.addCleanup(armada.common.policy.reset_policy)
if self.verify:
self._install_policy_verification_hook()
def _verify_policies_match(self):
"""Validate that the expected and actual policies are equivalent.
Otherwise an ``AssertionError`` is raised.
"""
if not (set(self.expected_policy_actions) ==
set(self.actual_policy_actions)):
error_msg = (
'The expected policy actions passed to '
'`self.policy.set_rules` do not match the policy actions '
'that were actually enforced by Armada. Set of expected '
'policies %s should be equal to set of actual policies: %s. '
'There is either a bug with the test or with policy '
'enforcement in the controller.' % (
self.expected_policy_actions,
self.actual_policy_actions)
)
raise AssertionError(error_msg)
def _install_policy_verification_hook(self):
"""Install policy verification hook for validating RBAC.
This function's purpose is to guarantee that policy enforcement is
happening the way we expect it to. It validates that the policies
that are passed to ``self.policy.set_rules`` from within a test that
uses this fixture is a subset of the actual policies that are enforced
by Armada controllers.
The algorithm is as follows:
1) Initialize list of actual policy actions to remember.
2) Initialize list of expected policy actions to remember.
3) Reference a pre-mocked copy of the policy enforcement function
that is ultimately called by Armada for policy enforcement.
4a) Create a hook that stores the actual policy for later.
4b) The hook then calls the *real* policy enforcement function
using the reference from step 3).
5) Mock the policy enforcement function and have it instead call
our hook from step 4a).
6) Add a clean up to undo the mock from step 5).
There is a tight coupling between this function and ``set_rules``
below.
The comparison between ``self.expected_policy_actions`` and
``self.actual_policy_actions`` is performed during clean up.
"""
self.actual_policy_actions = []
self.expected_policy_actions = []
_do_enforce_rbac = armada.common.policy._enforce_policy
def enforce_policy_and_remember_actual_rules(
action, *a, **k):
self.actual_policy_actions.append(action)
_do_enforce_rbac(action, *a, **k)
mock_do_enforce_rbac = mock.patch.object(
armada.common.policy, '_enforce_policy', autospec=True).start()
mock_do_enforce_rbac.side_effect = (
enforce_policy_and_remember_actual_rules)
self.addCleanup(mock.patch.stopall)
self.addCleanup(self._verify_policies_match)
def add_missing_default_rules(self, rules):
"""Adds default rules and their values to the given rules dict.
The given rulen dict may have an incomplete set of policy rules.
This method will add the default policy rules and their values to
the dict. It will not override the existing rules.
"""
for rule in policies.list_rules():
if rule.name not in rules:
rules[rule.name] = rule.check_str
def set_rules(self, rules, overwrite=True):
"""Set the custom policy rules to override.
:param dict rules: Dictionary keyed with policy actions enforced
by Armada whose values are a custom rule understood by
``oslo.policy`` library.
This function overrides the default policy rules with the custom rules
specified by ``rules``. The ``rules`` passed here are added to
``self.expected_policy_actions`` for later comparison with
``self.actual_policy_actions``.
"""
if isinstance(rules, dict):
rules = oslo_policy.Rules.from_dict(rules)
self.expected_policy_actions.extend(rules)
policy = armada.common.policy._ENFORCER
policy.set_rules(rules, overwrite=overwrite)