Deckhand Negative RBAC test scenarios

The framework for being able to do RBAC unit testing
in Deckhand was added here:

    #I86f269a5b616b518e5f742a4005891412226fe2a
    https://review.gerrithub.io/#/c/381205/

This PS expands on that foundation by implementing
negative RBAC tests for the remainder of the Deckhand
APIs. Negative testing means attempting to call APIs
with insufficient permissions and expecting 403s or
empty response bodies, depending on whether the
policy enforcement is critical or conditionally
applied.

Also fixes a minor bug related to returning a deleted
document for the endpoint PUT /api/v1.0/bucket/{bucket_name}/documents

Change-Id: I7ae50f300c1c877c3c162a032611a380f8948065
This commit is contained in:
Felipe Monteiro 2017-10-27 20:18:14 +01:00
parent da4b33b2a8
commit b22fa5d2f3
14 changed files with 553 additions and 42 deletions

7
.gitignore vendored
View File

@ -49,6 +49,7 @@ coverage.xml
.testrepository/*
cover/*
results/*
.stestr/
# Translations
*.mo
@ -66,7 +67,8 @@ instance/
.scrapy
# Sphinx documentation
docs/_build/
doc/_build/
doc/source/_static/
# PyBuilder
target/
@ -106,3 +108,6 @@ ENV/
# makefile build/lint artifacts
/charts/deckhand/*
# git
Changelog

View File

@ -319,7 +319,7 @@ class DocumentSecretFactory(DeckhandFactory):
},
"metadata": {
"schema": "metadata/Document/v1",
"name": "application-api",
"name": "",
"storagePolicy": ""
},
"schema": "deckhand/%s/v1"
@ -349,9 +349,11 @@ class DocumentSecretFactory(DeckhandFactory):
def gen(self):
raise NotImplementedError()
def gen_test(self, schema, storage_policy, data=None):
def gen_test(self, schema, storage_policy, data=None, name=None):
if data is None:
data = test_utils.rand_password()
if name is None:
name = test_utils.rand_name('document')
document_secret_template = copy.deepcopy(self.DOCUMENT_SECRET_TEMPLATE)
@ -359,6 +361,7 @@ class DocumentSecretFactory(DeckhandFactory):
document_secret_template['schema'] = (
document_secret_template['schema'] % schema)
document_secret_template['data'] = data
document_secret_template['metadata']['name'] = name
return document_secret_template

View File

@ -31,3 +31,19 @@ class BaseControllerTest(test_base.DeckhandWithDBTestCase,
# NOTE: allow_anonymous_access allows these unit tests to get around
# Keystone authentication.
self.useFixture(fixtures.ConfPatcher(allow_anonymous_access=True))
def tearDown(self):
super(BaseControllerTest, self).tearDown()
# Validate whether policy enforcement happened the way we expected it
# to. This check is really only meaningful if ``self.policy.set_rules``
# is used within the context of a test that inherits from this class.
self.assertTrue(
set(self.policy.expected_policy_actions) ==
set(self.policy.actual_policy_actions),
'The expected policy actions passed to ``self.policy.set_rules`` '
'do not match the policy actions that were actually enforced by '
'Deckhand. Set of expected policies %s should be equal to set of '
'actual policies: %s. There is either a bug with the test or with '
'policy enforcement in the controller.' % (
self.policy.expected_policy_actions,
self.policy.actual_policy_actions))

View File

@ -26,7 +26,7 @@ CONF = cfg.CONF
class TestBucketsController(test_base.BaseControllerTest):
"""Test suite for validating positive scenarios for bucket controller."""
"""Test suite for validating positive scenarios for buckets controller."""
def test_put_bucket(self):
rules = {'deckhand:create_cleartext_documents': '@'}
@ -62,6 +62,7 @@ class TestBucketsController(test_base.BaseControllerTest):
body=yaml.safe_dump_all(payload))
self.assertEqual(200, resp.status_code)
created_documents = list(yaml.safe_load_all(resp.text))
self.assertEqual(1, len(created_documents))
expected = sorted([(d['schema'], d['metadata']['name'])
for d in payload])
@ -180,21 +181,9 @@ class TestBucketsControllerNegativeRBAC(test_base.BaseControllerTest):
body=yaml.safe_dump_all(payload))
self.assertEqual(403, resp.status_code)
def test_put_bucket_cleartext_secret_except_forbidden(self):
rules = {'deckhand:create_cleartext_documents': 'rule:admin_api'}
self.policy.set_rules(rules)
secrets_factory = factories.DocumentSecretFactory()
payload = [secrets_factory.gen_test('Certificate', 'cleartext')]
resp = self.app.simulate_put(
'/api/v1.0/buckets/mop/documents',
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(payload))
self.assertEqual(403, resp.status_code)
def test_put_bucket_encrypted_secret_except_forbidden(self):
rules = {'deckhand:create_encrypted_documents': 'rule:admin_api'}
def test_put_bucket_encrypted_document_except_forbidden(self):
rules = {'deckhand:create_encrypted_documents': 'rule:admin_api',
'deckhand:create_cleartext_documents': '@'}
self.policy.set_rules(rules)
secrets_factory = factories.DocumentSecretFactory()

View File

@ -31,8 +31,9 @@ class TestErrorFormatting(test_base.BaseControllerTest):
"""Verify formatting for an exception class that inherits from
:class:`Exception`.
"""
with mock.patch.object(policy, '_do_enforce_rbac', autospec=True) \
as m_enforce_rbac:
with mock.patch.object(
policy, '_do_enforce_rbac',
spec_set=policy._do_enforce_rbac) as m_enforce_rbac:
m_enforce_rbac.side_effect = Exception
resp = self.app.simulate_put(
'/api/v1.0/buckets/test/documents',
@ -70,8 +71,9 @@ class TestErrorFormatting(test_base.BaseControllerTest):
expected_msg = (
'deckhand:create_cleartext_documents is disallowed by policy')
with mock.patch.object(policy, '_do_enforce_rbac', autospec=True) \
as m_enforce_rbac:
with mock.patch.object(
policy, '_do_enforce_rbac',
spec_set=policy._do_enforce_rbac) as m_enforce_rbac:
m_enforce_rbac.side_effect = falcon.HTTPForbidden(
description=expected_msg)
resp = self.app.simulate_put(

View File

@ -0,0 +1,79 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml
import mock
from deckhand.control import buckets
from deckhand import factories
from deckhand.tests.unit.control import base as test_base
class TestRenderedDocumentsControllerNegativeRBAC(
test_base.BaseControllerTest):
"""Test suite for validating negative RBAC scenarios for rendered documents
controller.
"""
def test_list_cleartext_revision_documents_insufficient_permissions(self):
rules = {'deckhand:list_cleartext_documents': 'rule:admin_api',
'deckhand:create_cleartext_documents': '@'}
self.policy.set_rules(rules)
# Create a document for a bucket.
secrets_factory = factories.DocumentSecretFactory()
payload = [secrets_factory.gen_test('Certificate', 'cleartext')]
resp = self.app.simulate_put(
'/api/v1.0/buckets/mop/documents',
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(payload))
self.assertEqual(200, resp.status_code)
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
'revision']
# Verify that the created document was not returned.
resp = self.app.simulate_get(
'/api/v1.0/revisions/%s/rendered-documents' % revision_id,
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(403, resp.status_code)
def test_list_encrypted_revision_documents_insufficient_permissions(self):
rules = {'deckhand:list_cleartext_documents': '@',
'deckhand:list_encrypted_documents': 'rule:admin_api',
'deckhand:create_cleartext_documents': '@',
'deckhand:create_encrypted_documents': '@'}
self.policy.set_rules(rules)
# Create a document for a bucket.
secrets_factory = factories.DocumentSecretFactory()
payload = [secrets_factory.gen_test('Certificate', 'encrypted')]
with mock.patch.object(buckets.BucketsResource, 'secrets_mgr',
autospec=True) as mock_secrets_mgr:
mock_secrets_mgr.create.return_value = {
'secret': payload[0]['data']}
resp = self.app.simulate_put(
'/api/v1.0/buckets/mop/documents',
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(payload))
self.assertEqual(200, resp.status_code)
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
'revision']
# Verify that the created document was not returned.
resp = self.app.simulate_get(
'/api/v1.0/revisions/%s/rendered-documents' % revision_id,
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(200, resp.status_code)
self.assertEmpty(list(yaml.safe_load_all(resp.text)))

View File

@ -0,0 +1,82 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml
import mock
from deckhand.control import buckets
from deckhand import factories
from deckhand.tests.unit.control import base as test_base
class TestRevisionDocumentsControllerNegativeRBAC(
test_base.BaseControllerTest):
"""Test suite for validating negative RBAC scenarios for revision documents
controller.
For these tests, if policy enforcement fails, the response body should be
empty.
"""
def test_list_cleartext_revision_documents_insufficient_permissions(self):
rules = {'deckhand:list_cleartext_documents': 'rule:admin_api',
'deckhand:create_cleartext_documents': '@'}
self.policy.set_rules(rules)
# Create a document for a bucket.
secrets_factory = factories.DocumentSecretFactory()
payload = [secrets_factory.gen_test('Certificate', 'cleartext')]
resp = self.app.simulate_put(
'/api/v1.0/buckets/mop/documents',
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(payload))
self.assertEqual(200, resp.status_code)
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
'revision']
# Verify that the created document was not returned.
resp = self.app.simulate_get(
'/api/v1.0/revisions/%s/documents' % revision_id,
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(403, resp.status_code)
def test_list_encrypted_revision_documents_insufficient_permissions(self):
rules = {'deckhand:list_cleartext_documents': '@',
'deckhand:list_encrypted_documents': 'rule:admin_api',
'deckhand:create_cleartext_documents': '@',
'deckhand:create_encrypted_documents': '@'}
self.policy.set_rules(rules)
# Create a document for a bucket.
secrets_factory = factories.DocumentSecretFactory()
payload = [secrets_factory.gen_test('Certificate', 'encrypted')]
with mock.patch.object(buckets.BucketsResource, 'secrets_mgr',
autospec=True) as mock_secrets_mgr:
mock_secrets_mgr.create.return_value = {
'secret': payload[0]['data']}
resp = self.app.simulate_put(
'/api/v1.0/buckets/mop/documents',
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(payload))
self.assertEqual(200, resp.status_code)
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
'revision']
# Verify that the created document was not returned.
resp = self.app.simulate_get(
'/api/v1.0/revisions/%s/documents' % revision_id,
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(200, resp.status_code)
self.assertEmpty(list(yaml.safe_load_all(resp.text)))

View File

@ -0,0 +1,97 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml
from deckhand import factories
from deckhand.tests.unit.control import base as test_base
class TestRevisionTagsControllerNegativeRBAC(test_base.BaseControllerTest):
"""Test suite for validating negative RBAC scenarios for revision tags
controller.
"""
def setUp(self):
super(TestRevisionTagsControllerNegativeRBAC, self).setUp()
rules = {'deckhand:create_cleartext_documents': '@'}
self.policy.set_rules(rules)
# Create a revision to tag.
secrets_factory = factories.DocumentSecretFactory()
payload = [secrets_factory.gen_test('Certificate', 'cleartext')]
resp = self.app.simulate_put(
'/api/v1.0/buckets/mop/documents',
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(payload))
self.assertEqual(200, resp.status_code)
self.revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
'revision']
def test_revision_list_tags_except_forbidden(self):
rules = {'deckhand:list_tags': 'rule:admin_api'}
self.policy.set_rules(rules)
resp = self.app.simulate_get(
'/api/v1.0/revisions/%s/tags' % self.revision_id,
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(403, resp.status_code)
def test_revision_show_tag_except_forbidden(self):
rules = {'deckhand:create_tag': '@',
'deckhand:show_tag': 'rule:admin_api'}
self.policy.set_rules(rules)
resp = self.app.simulate_post(
'/api/v1.0/revisions/%s/tags/%s' % (self.revision_id, 'test'),
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(201, resp.status_code)
resp = self.app.simulate_get(
'/api/v1.0/revisions/%s/tags/test' % self.revision_id,
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(403, resp.status_code)
def test_revision_create_tag_except_forbidden(self):
rules = {'deckhand:create_tag': 'rule:admin_api'}
self.policy.set_rules(rules)
resp = self.app.simulate_post(
'/api/v1.0/revisions/%s/tags/%s' % (self.revision_id, 'test'),
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(403, resp.status_code)
def test_revision_delete_tag_except_forbidden(self):
rules = {'deckhand:create_tag': '@',
'deckhand:delete_tag': 'rule:admin_api'}
self.policy.set_rules(rules)
resp = self.app.simulate_post(
'/api/v1.0/revisions/%s/tags/%s' % (self.revision_id, 'test'),
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(201, resp.status_code)
resp = self.app.simulate_delete(
'/api/v1.0/revisions/%s/tags/%s' % (self.revision_id, 'test'),
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(403, resp.status_code)
def test_revision_delete_all_tags_except_forbidden(self):
rules = {'deckhand:delete_tags': 'rule:admin_api'}
self.policy.set_rules(rules)
resp = self.app.simulate_delete(
'/api/v1.0/revisions/%s/tags' % self.revision_id,
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(403, resp.status_code)

View File

@ -0,0 +1,66 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml
from deckhand import factories
from deckhand.tests.unit.control import base as test_base
class TestRevisionsControllerNegativeRBAC(test_base.BaseControllerTest):
"""Test suite for validating negative RBAC scenarios for revisions
controller.
"""
def test_list_revisions_except_forbidden(self):
rules = {'deckhand:list_revisions': 'rule:admin_api'}
self.policy.set_rules(rules)
resp = self.app.simulate_get(
'/api/v1.0/revisions',
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(403, resp.status_code)
def test_show_revision_except_forbidden(self):
rules = {'deckhand:show_revision': 'rule:admin_api',
'deckhand:create_cleartext_documents': '@'}
self.policy.set_rules(rules)
# Create a bucket with a document to generate a revision_id.
secrets_factory = factories.DocumentSecretFactory()
payload = [secrets_factory.gen_test('Certificate', 'cleartext')]
resp = self.app.simulate_put(
'/api/v1.0/buckets/mop/documents',
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(payload))
self.assertEqual(200, resp.status_code)
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
'revision']
# Then try to query the revision "show" endpoint with insufficient
# permissions.
resp = self.app.simulate_get(
'/api/v1.0/revisions/%s' % revision_id,
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(403, resp.status_code)
def test_delete_revisions_except_forbidden(self):
rules = {'deckhand:delete_revisions': 'rule:admin_api'}
self.policy.set_rules(rules)
resp = self.app.simulate_delete(
'/api/v1.0/revisions',
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(403, resp.status_code)

View File

@ -0,0 +1,30 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from deckhand.tests.unit.control import base as test_base
class TestRevisionsDiffControllerNegativeRBAC(test_base.BaseControllerTest):
"""Test suite for validating negative RBAC scenarios for revisions diff
controller.
"""
def test_show_revision_diff_except_forbidden(self):
rules = {'deckhand:show_revision_diff': 'rule:admin_api'}
self.policy.set_rules(rules)
resp = self.app.simulate_get(
'/api/v1.0/revisions/0/diff/0',
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(403, resp.status_code)

View File

@ -0,0 +1,81 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml
import mock
from deckhand.control import buckets
from deckhand import factories
from deckhand.tests.unit.control import base as test_base
class TestRevisionsRollbackControllerNegativeRBAC(
test_base.BaseControllerTest):
"""Test suite for validating negative RBAC scenarios for revisions rollback
controller.
"""
def test_revision_rollback_cleartext_except_forbidden(self):
rules = {'deckhand:create_cleartext_documents': '@'}
self.policy.set_rules(rules)
# Create a revision so we have something to roll back to.
secrets_factory = factories.DocumentSecretFactory()
payload = [secrets_factory.gen_test('Certificate', 'cleartext')]
resp = self.app.simulate_put(
'/api/v1.0/buckets/mop/documents',
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(payload))
self.assertEqual(200, resp.status_code)
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
'revision']
rules = {'deckhand:create_cleartext_documents': 'rule:admin_api'}
self.policy.set_rules(rules)
resp = self.app.simulate_post(
'/api/v1.0/rollback/%s' % revision_id,
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(403, resp.status_code)
def test_revision_rollback_encrypted_except_forbidden(self):
rules = {'deckhand:create_encrypted_documents': '@',
'deckhand:create_cleartext_documents': '@'}
self.policy.set_rules(rules)
# Create a revision so we have something to roll back to.
secrets_factory = factories.DocumentSecretFactory()
payload = [secrets_factory.gen_test('Certificate', 'encrypted')]
with mock.patch.object(buckets.BucketsResource, 'secrets_mgr',
autospec=True) as mock_secrets_mgr:
mock_secrets_mgr.create.return_value = {
'secret': payload[0]['data']}
resp = self.app.simulate_put(
'/api/v1.0/buckets/mop/documents',
headers={'Content-Type': 'application/x-yaml'},
body=yaml.safe_dump_all(payload))
self.assertEqual(200, resp.status_code)
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
'revision']
rules = {'deckhand:create_encrypted_documents': 'rule:admin_api',
'deckhand:create_cleartext_documents': '@'}
self.policy.set_rules(rules)
resp = self.app.simulate_post(
'/api/v1.0/rollback/%s' % revision_id,
headers={'Content-Type': 'application/x-yaml'})
self.assertEqual(403, resp.status_code)

View File

@ -512,7 +512,6 @@ class TestValidationsController(test_base.BaseControllerTest):
depends on substitution from another document.
"""
rules = {'deckhand:create_cleartext_documents': '@',
'deckhand:create_validation': '@',
'deckhand:list_validations': '@'}
self.policy.set_rules(rules)

View File

@ -147,52 +147,55 @@ class TestDocuments(base.TestDbBase):
rand_secret = {'secret': test_utils.rand_password()}
bucket_name = test_utils.rand_name('bucket')
for storage_policy in ('encrypted', 'cleartext'):
for expected_len, storage_policy in enumerate(
('encrypted', 'cleartext')):
secret_doc_payload = self.secrets_factory.gen_test(
'Certificate', storage_policy, rand_secret)
created_documents = self.create_documents(
bucket_name, secret_doc_payload)
self.assertEqual(1, len(created_documents))
self.assertIn('Certificate', created_documents[0]['schema'])
self.assertEqual(storage_policy, created_documents[0][
self.assertEqual(expected_len + 1, len(created_documents))
self.assertIn('Certificate', created_documents[-1]['schema'])
self.assertEqual(storage_policy, created_documents[-1][
'metadata']['storagePolicy'])
self.assertTrue(created_documents[0]['is_secret'])
self.assertEqual(rand_secret, created_documents[0]['data'])
self.assertTrue(created_documents[-1]['is_secret'])
self.assertEqual(rand_secret, created_documents[-1]['data'])
def test_create_certificate_key(self):
rand_secret = {'secret': test_utils.rand_password()}
bucket_name = test_utils.rand_name('bucket')
for storage_policy in ('encrypted', 'cleartext'):
for expected_len, storage_policy in enumerate(
('encrypted', 'cleartext')):
secret_doc_payload = self.secrets_factory.gen_test(
'CertificateKey', storage_policy, rand_secret)
created_documents = self.create_documents(
bucket_name, secret_doc_payload)
self.assertEqual(1, len(created_documents))
self.assertIn('CertificateKey', created_documents[0]['schema'])
self.assertEqual(storage_policy, created_documents[0][
self.assertEqual(expected_len + 1, len(created_documents))
self.assertIn('CertificateKey', created_documents[-1]['schema'])
self.assertEqual(storage_policy, created_documents[-1][
'metadata']['storagePolicy'])
self.assertTrue(created_documents[0]['is_secret'])
self.assertEqual(rand_secret, created_documents[0]['data'])
self.assertTrue(created_documents[-1]['is_secret'])
self.assertEqual(rand_secret, created_documents[-1]['data'])
def test_create_passphrase(self):
rand_secret = {'secret': test_utils.rand_password()}
bucket_name = test_utils.rand_name('bucket')
for storage_policy in ('encrypted', 'cleartext'):
for expected_len, storage_policy in enumerate(
('encrypted', 'cleartext')):
secret_doc_payload = self.secrets_factory.gen_test(
'Passphrase', storage_policy, rand_secret)
created_documents = self.create_documents(
bucket_name, secret_doc_payload)
self.assertEqual(1, len(created_documents))
self.assertIn('Passphrase', created_documents[0]['schema'])
self.assertEqual(storage_policy, created_documents[0][
self.assertEqual(expected_len + 1, len(created_documents))
self.assertIn('Passphrase', created_documents[-1]['schema'])
self.assertEqual(storage_policy, created_documents[-1][
'metadata']['storagePolicy'])
self.assertTrue(created_documents[0]['is_secret'])
self.assertEqual(rand_secret, created_documents[0]['data'])
self.assertTrue(created_documents[-1]['is_secret'])
self.assertEqual(rand_secret, created_documents[-1]['data'])
def test_delete_document(self):
payload = base.DocumentFixture.get_minimal_fixture()

View File

@ -19,6 +19,7 @@ import os
import yaml
import fixtures
import mock
from oslo_config import cfg
from oslo_policy import opts as policy_opts
from oslo_policy import policy as oslo_policy
@ -84,6 +85,51 @@ class RealPolicyFixture(fixtures.Fixture):
deckhand.policy.reset()
deckhand.policy.init()
self.addCleanup(deckhand.policy.reset)
self._install_policy_verification_hook()
def _install_policy_verification_hook(self):
"""Install policy verification hook for validating RBAC.
This function's purpose is to guarantee that policy enforcement is
happening the way we expect it to. It validates that the policies
that are passed to ``self.policy.set_rules`` from within a test that
uses this fixture is a subset of the actual policies that are enforced
by Deckhand controllers.
The algorithm is as follows:
1) Initialize list of actual policy actions to remember.
2) Initialize list of expected policy actions to remember.
3) Reference a pre-mocked copy of the policy enforcement function
that is ultimately called by Deckhand for policy enforcement.
4a) Create a hook that stores the actual policy for later.
4b) The hook then calls the *real* policy enforcement function
using the reference from step 3).
5) Mock the policy enforcement function and have it instead call
our hook from step 4a).
6) Add a clean up to undo the mock from step 5).
There is a tight coupling between this function and ``set_rules``
below.
The comparison between ``self.expected_policy_actions`` and
``self.actual_policy_actions`` should be done in the ``tearDown``
function of the class that uses this fixture.
"""
self.actual_policy_actions = []
self.expected_policy_actions = []
_do_enforce_rbac = deckhand.policy._do_enforce_rbac
def enforce_policy_and_remember_actual_rules(
action, *a, **k):
self.actual_policy_actions.append(action)
_do_enforce_rbac(action, *a, **k)
mock_do_enforce_rbac = mock.patch.object(
deckhand.policy, '_do_enforce_rbac', autospec=True).start()
mock_do_enforce_rbac.side_effect = (
enforce_policy_and_remember_actual_rules)
self.addCleanup(mock.patch.stopall)
def add_missing_default_rules(self, rules):
"""Adds default rules and their values to the given rules dict.
@ -97,8 +143,21 @@ class RealPolicyFixture(fixtures.Fixture):
rules[rule.name] = rule.check_str
def set_rules(self, rules, overwrite=True):
"""Set the custom policy rules to override.
:param dict rules: Dictionary keyed with policy actions enforced
by Deckhand whose values are a custom rule understood by
``oslo.policy`` library.
This function overrides the default policy rules with the custom rules
specified by ``rules``. The ``rules`` passed here are added to
``self.expected_policy_actions`` for later comparison with
``self.actual_policy_actions``.
"""
if isinstance(rules, dict):
rules = oslo_policy.Rules.from_dict(rules)
self.expected_policy_actions.extend(rules)
policy = deckhand.policy._ENFORCER
policy.set_rules(rules, overwrite=overwrite)