Merge "Deckhand Negative RBAC test scenarios"
This commit is contained in:
commit
1ef4a78f02
|
@ -49,6 +49,7 @@ coverage.xml
|
|||
.testrepository/*
|
||||
cover/*
|
||||
results/*
|
||||
.stestr/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
|
@ -66,7 +67,8 @@ instance/
|
|||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
doc/_build/
|
||||
doc/source/_static/
|
||||
|
||||
# PyBuilder
|
||||
target/
|
||||
|
@ -106,3 +108,6 @@ ENV/
|
|||
|
||||
# makefile build/lint artifacts
|
||||
/charts/deckhand/*
|
||||
|
||||
# git
|
||||
Changelog
|
||||
|
|
|
@ -319,7 +319,7 @@ class DocumentSecretFactory(DeckhandFactory):
|
|||
},
|
||||
"metadata": {
|
||||
"schema": "metadata/Document/v1",
|
||||
"name": "application-api",
|
||||
"name": "",
|
||||
"storagePolicy": ""
|
||||
},
|
||||
"schema": "deckhand/%s/v1"
|
||||
|
@ -349,9 +349,11 @@ class DocumentSecretFactory(DeckhandFactory):
|
|||
def gen(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def gen_test(self, schema, storage_policy, data=None):
|
||||
def gen_test(self, schema, storage_policy, data=None, name=None):
|
||||
if data is None:
|
||||
data = test_utils.rand_password()
|
||||
if name is None:
|
||||
name = test_utils.rand_name('document')
|
||||
|
||||
document_secret_template = copy.deepcopy(self.DOCUMENT_SECRET_TEMPLATE)
|
||||
|
||||
|
@ -359,6 +361,7 @@ class DocumentSecretFactory(DeckhandFactory):
|
|||
document_secret_template['schema'] = (
|
||||
document_secret_template['schema'] % schema)
|
||||
document_secret_template['data'] = data
|
||||
document_secret_template['metadata']['name'] = name
|
||||
|
||||
return document_secret_template
|
||||
|
||||
|
|
|
@ -31,3 +31,19 @@ class BaseControllerTest(test_base.DeckhandWithDBTestCase,
|
|||
# NOTE: allow_anonymous_access allows these unit tests to get around
|
||||
# Keystone authentication.
|
||||
self.useFixture(fixtures.ConfPatcher(allow_anonymous_access=True))
|
||||
|
||||
def tearDown(self):
|
||||
super(BaseControllerTest, self).tearDown()
|
||||
# Validate whether policy enforcement happened the way we expected it
|
||||
# to. This check is really only meaningful if ``self.policy.set_rules``
|
||||
# is used within the context of a test that inherits from this class.
|
||||
self.assertTrue(
|
||||
set(self.policy.expected_policy_actions) ==
|
||||
set(self.policy.actual_policy_actions),
|
||||
'The expected policy actions passed to ``self.policy.set_rules`` '
|
||||
'do not match the policy actions that were actually enforced by '
|
||||
'Deckhand. Set of expected policies %s should be equal to set of '
|
||||
'actual policies: %s. There is either a bug with the test or with '
|
||||
'policy enforcement in the controller.' % (
|
||||
self.policy.expected_policy_actions,
|
||||
self.policy.actual_policy_actions))
|
||||
|
|
|
@ -26,7 +26,7 @@ CONF = cfg.CONF
|
|||
|
||||
|
||||
class TestBucketsController(test_base.BaseControllerTest):
|
||||
"""Test suite for validating positive scenarios for bucket controller."""
|
||||
"""Test suite for validating positive scenarios for buckets controller."""
|
||||
|
||||
def test_put_bucket(self):
|
||||
rules = {'deckhand:create_cleartext_documents': '@'}
|
||||
|
@ -62,6 +62,7 @@ class TestBucketsController(test_base.BaseControllerTest):
|
|||
body=yaml.safe_dump_all(payload))
|
||||
self.assertEqual(200, resp.status_code)
|
||||
created_documents = list(yaml.safe_load_all(resp.text))
|
||||
|
||||
self.assertEqual(1, len(created_documents))
|
||||
expected = sorted([(d['schema'], d['metadata']['name'])
|
||||
for d in payload])
|
||||
|
@ -180,21 +181,9 @@ class TestBucketsControllerNegativeRBAC(test_base.BaseControllerTest):
|
|||
body=yaml.safe_dump_all(payload))
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
def test_put_bucket_cleartext_secret_except_forbidden(self):
|
||||
rules = {'deckhand:create_cleartext_documents': 'rule:admin_api'}
|
||||
self.policy.set_rules(rules)
|
||||
|
||||
secrets_factory = factories.DocumentSecretFactory()
|
||||
payload = [secrets_factory.gen_test('Certificate', 'cleartext')]
|
||||
|
||||
resp = self.app.simulate_put(
|
||||
'/api/v1.0/buckets/mop/documents',
|
||||
headers={'Content-Type': 'application/x-yaml'},
|
||||
body=yaml.safe_dump_all(payload))
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
def test_put_bucket_encrypted_secret_except_forbidden(self):
|
||||
rules = {'deckhand:create_encrypted_documents': 'rule:admin_api'}
|
||||
def test_put_bucket_encrypted_document_except_forbidden(self):
|
||||
rules = {'deckhand:create_encrypted_documents': 'rule:admin_api',
|
||||
'deckhand:create_cleartext_documents': '@'}
|
||||
self.policy.set_rules(rules)
|
||||
|
||||
secrets_factory = factories.DocumentSecretFactory()
|
||||
|
|
|
@ -31,8 +31,9 @@ class TestErrorFormatting(test_base.BaseControllerTest):
|
|||
"""Verify formatting for an exception class that inherits from
|
||||
:class:`Exception`.
|
||||
"""
|
||||
with mock.patch.object(policy, '_do_enforce_rbac', autospec=True) \
|
||||
as m_enforce_rbac:
|
||||
with mock.patch.object(
|
||||
policy, '_do_enforce_rbac',
|
||||
spec_set=policy._do_enforce_rbac) as m_enforce_rbac:
|
||||
m_enforce_rbac.side_effect = Exception
|
||||
resp = self.app.simulate_put(
|
||||
'/api/v1.0/buckets/test/documents',
|
||||
|
@ -70,8 +71,9 @@ class TestErrorFormatting(test_base.BaseControllerTest):
|
|||
expected_msg = (
|
||||
'deckhand:create_cleartext_documents is disallowed by policy')
|
||||
|
||||
with mock.patch.object(policy, '_do_enforce_rbac', autospec=True) \
|
||||
as m_enforce_rbac:
|
||||
with mock.patch.object(
|
||||
policy, '_do_enforce_rbac',
|
||||
spec_set=policy._do_enforce_rbac) as m_enforce_rbac:
|
||||
m_enforce_rbac.side_effect = falcon.HTTPForbidden(
|
||||
description=expected_msg)
|
||||
resp = self.app.simulate_put(
|
||||
|
|
|
@ -0,0 +1,79 @@
|
|||
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import yaml
|
||||
|
||||
import mock
|
||||
|
||||
from deckhand.control import buckets
|
||||
from deckhand import factories
|
||||
from deckhand.tests.unit.control import base as test_base
|
||||
|
||||
|
||||
class TestRenderedDocumentsControllerNegativeRBAC(
|
||||
test_base.BaseControllerTest):
|
||||
"""Test suite for validating negative RBAC scenarios for rendered documents
|
||||
controller.
|
||||
"""
|
||||
|
||||
def test_list_cleartext_revision_documents_insufficient_permissions(self):
|
||||
rules = {'deckhand:list_cleartext_documents': 'rule:admin_api',
|
||||
'deckhand:create_cleartext_documents': '@'}
|
||||
self.policy.set_rules(rules)
|
||||
|
||||
# Create a document for a bucket.
|
||||
secrets_factory = factories.DocumentSecretFactory()
|
||||
payload = [secrets_factory.gen_test('Certificate', 'cleartext')]
|
||||
resp = self.app.simulate_put(
|
||||
'/api/v1.0/buckets/mop/documents',
|
||||
headers={'Content-Type': 'application/x-yaml'},
|
||||
body=yaml.safe_dump_all(payload))
|
||||
self.assertEqual(200, resp.status_code)
|
||||
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
|
||||
'revision']
|
||||
|
||||
# Verify that the created document was not returned.
|
||||
resp = self.app.simulate_get(
|
||||
'/api/v1.0/revisions/%s/rendered-documents' % revision_id,
|
||||
headers={'Content-Type': 'application/x-yaml'})
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
def test_list_encrypted_revision_documents_insufficient_permissions(self):
|
||||
rules = {'deckhand:list_cleartext_documents': '@',
|
||||
'deckhand:list_encrypted_documents': 'rule:admin_api',
|
||||
'deckhand:create_cleartext_documents': '@',
|
||||
'deckhand:create_encrypted_documents': '@'}
|
||||
self.policy.set_rules(rules)
|
||||
|
||||
# Create a document for a bucket.
|
||||
secrets_factory = factories.DocumentSecretFactory()
|
||||
payload = [secrets_factory.gen_test('Certificate', 'encrypted')]
|
||||
with mock.patch.object(buckets.BucketsResource, 'secrets_mgr',
|
||||
autospec=True) as mock_secrets_mgr:
|
||||
mock_secrets_mgr.create.return_value = {
|
||||
'secret': payload[0]['data']}
|
||||
resp = self.app.simulate_put(
|
||||
'/api/v1.0/buckets/mop/documents',
|
||||
headers={'Content-Type': 'application/x-yaml'},
|
||||
body=yaml.safe_dump_all(payload))
|
||||
self.assertEqual(200, resp.status_code)
|
||||
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
|
||||
'revision']
|
||||
|
||||
# Verify that the created document was not returned.
|
||||
resp = self.app.simulate_get(
|
||||
'/api/v1.0/revisions/%s/rendered-documents' % revision_id,
|
||||
headers={'Content-Type': 'application/x-yaml'})
|
||||
self.assertEqual(200, resp.status_code)
|
||||
self.assertEmpty(list(yaml.safe_load_all(resp.text)))
|
|
@ -0,0 +1,82 @@
|
|||
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import yaml
|
||||
|
||||
import mock
|
||||
|
||||
from deckhand.control import buckets
|
||||
from deckhand import factories
|
||||
from deckhand.tests.unit.control import base as test_base
|
||||
|
||||
|
||||
class TestRevisionDocumentsControllerNegativeRBAC(
|
||||
test_base.BaseControllerTest):
|
||||
"""Test suite for validating negative RBAC scenarios for revision documents
|
||||
controller.
|
||||
|
||||
For these tests, if policy enforcement fails, the response body should be
|
||||
empty.
|
||||
"""
|
||||
|
||||
def test_list_cleartext_revision_documents_insufficient_permissions(self):
|
||||
rules = {'deckhand:list_cleartext_documents': 'rule:admin_api',
|
||||
'deckhand:create_cleartext_documents': '@'}
|
||||
self.policy.set_rules(rules)
|
||||
|
||||
# Create a document for a bucket.
|
||||
secrets_factory = factories.DocumentSecretFactory()
|
||||
payload = [secrets_factory.gen_test('Certificate', 'cleartext')]
|
||||
resp = self.app.simulate_put(
|
||||
'/api/v1.0/buckets/mop/documents',
|
||||
headers={'Content-Type': 'application/x-yaml'},
|
||||
body=yaml.safe_dump_all(payload))
|
||||
self.assertEqual(200, resp.status_code)
|
||||
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
|
||||
'revision']
|
||||
|
||||
# Verify that the created document was not returned.
|
||||
resp = self.app.simulate_get(
|
||||
'/api/v1.0/revisions/%s/documents' % revision_id,
|
||||
headers={'Content-Type': 'application/x-yaml'})
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
def test_list_encrypted_revision_documents_insufficient_permissions(self):
|
||||
rules = {'deckhand:list_cleartext_documents': '@',
|
||||
'deckhand:list_encrypted_documents': 'rule:admin_api',
|
||||
'deckhand:create_cleartext_documents': '@',
|
||||
'deckhand:create_encrypted_documents': '@'}
|
||||
self.policy.set_rules(rules)
|
||||
|
||||
# Create a document for a bucket.
|
||||
secrets_factory = factories.DocumentSecretFactory()
|
||||
payload = [secrets_factory.gen_test('Certificate', 'encrypted')]
|
||||
with mock.patch.object(buckets.BucketsResource, 'secrets_mgr',
|
||||
autospec=True) as mock_secrets_mgr:
|
||||
mock_secrets_mgr.create.return_value = {
|
||||
'secret': payload[0]['data']}
|
||||
resp = self.app.simulate_put(
|
||||
'/api/v1.0/buckets/mop/documents',
|
||||
headers={'Content-Type': 'application/x-yaml'},
|
||||
body=yaml.safe_dump_all(payload))
|
||||
self.assertEqual(200, resp.status_code)
|
||||
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
|
||||
'revision']
|
||||
|
||||
# Verify that the created document was not returned.
|
||||
resp = self.app.simulate_get(
|
||||
'/api/v1.0/revisions/%s/documents' % revision_id,
|
||||
headers={'Content-Type': 'application/x-yaml'})
|
||||
self.assertEqual(200, resp.status_code)
|
||||
self.assertEmpty(list(yaml.safe_load_all(resp.text)))
|
|
@ -0,0 +1,97 @@
|
|||
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import yaml
|
||||
|
||||
from deckhand import factories
|
||||
from deckhand.tests.unit.control import base as test_base
|
||||
|
||||
|
||||
class TestRevisionTagsControllerNegativeRBAC(test_base.BaseControllerTest):
|
||||
"""Test suite for validating negative RBAC scenarios for revision tags
|
||||
controller.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super(TestRevisionTagsControllerNegativeRBAC, self).setUp()
|
||||
rules = {'deckhand:create_cleartext_documents': '@'}
|
||||
self.policy.set_rules(rules)
|
||||
|
||||
# Create a revision to tag.
|
||||
secrets_factory = factories.DocumentSecretFactory()
|
||||
payload = [secrets_factory.gen_test('Certificate', 'cleartext')]
|
||||
resp = self.app.simulate_put(
|
||||
'/api/v1.0/buckets/mop/documents',
|
||||
headers={'Content-Type': 'application/x-yaml'},
|
||||
body=yaml.safe_dump_all(payload))
|
||||
self.assertEqual(200, resp.status_code)
|
||||
self.revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
|
||||
'revision']
|
||||
|
||||
def test_revision_list_tags_except_forbidden(self):
|
||||
rules = {'deckhand:list_tags': 'rule:admin_api'}
|
||||
self.policy.set_rules(rules)
|
||||
|
||||
resp = self.app.simulate_get(
|
||||
'/api/v1.0/revisions/%s/tags' % self.revision_id,
|
||||
headers={'Content-Type': 'application/x-yaml'})
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
def test_revision_show_tag_except_forbidden(self):
|
||||
rules = {'deckhand:create_tag': '@',
|
||||
'deckhand:show_tag': 'rule:admin_api'}
|
||||
self.policy.set_rules(rules)
|
||||
|
||||
resp = self.app.simulate_post(
|
||||
'/api/v1.0/revisions/%s/tags/%s' % (self.revision_id, 'test'),
|
||||
headers={'Content-Type': 'application/x-yaml'})
|
||||
self.assertEqual(201, resp.status_code)
|
||||
|
||||
resp = self.app.simulate_get(
|
||||
'/api/v1.0/revisions/%s/tags/test' % self.revision_id,
|
||||
headers={'Content-Type': 'application/x-yaml'})
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
def test_revision_create_tag_except_forbidden(self):
|
||||
rules = {'deckhand:create_tag': 'rule:admin_api'}
|
||||
self.policy.set_rules(rules)
|
||||
|
||||
resp = self.app.simulate_post(
|
||||
'/api/v1.0/revisions/%s/tags/%s' % (self.revision_id, 'test'),
|
||||
headers={'Content-Type': 'application/x-yaml'})
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
def test_revision_delete_tag_except_forbidden(self):
|
||||
rules = {'deckhand:create_tag': '@',
|
||||
'deckhand:delete_tag': 'rule:admin_api'}
|
||||
self.policy.set_rules(rules)
|
||||
|
||||
resp = self.app.simulate_post(
|
||||
'/api/v1.0/revisions/%s/tags/%s' % (self.revision_id, 'test'),
|
||||
headers={'Content-Type': 'application/x-yaml'})
|
||||
self.assertEqual(201, resp.status_code)
|
||||
|
||||
resp = self.app.simulate_delete(
|
||||
'/api/v1.0/revisions/%s/tags/%s' % (self.revision_id, 'test'),
|
||||
headers={'Content-Type': 'application/x-yaml'})
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
def test_revision_delete_all_tags_except_forbidden(self):
|
||||
rules = {'deckhand:delete_tags': 'rule:admin_api'}
|
||||
self.policy.set_rules(rules)
|
||||
|
||||
resp = self.app.simulate_delete(
|
||||
'/api/v1.0/revisions/%s/tags' % self.revision_id,
|
||||
headers={'Content-Type': 'application/x-yaml'})
|
||||
self.assertEqual(403, resp.status_code)
|
|
@ -0,0 +1,66 @@
|
|||
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import yaml
|
||||
|
||||
from deckhand import factories
|
||||
from deckhand.tests.unit.control import base as test_base
|
||||
|
||||
|
||||
class TestRevisionsControllerNegativeRBAC(test_base.BaseControllerTest):
|
||||
"""Test suite for validating negative RBAC scenarios for revisions
|
||||
controller.
|
||||
"""
|
||||
|
||||
def test_list_revisions_except_forbidden(self):
|
||||
rules = {'deckhand:list_revisions': 'rule:admin_api'}
|
||||
self.policy.set_rules(rules)
|
||||
|
||||
resp = self.app.simulate_get(
|
||||
'/api/v1.0/revisions',
|
||||
headers={'Content-Type': 'application/x-yaml'})
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
def test_show_revision_except_forbidden(self):
|
||||
rules = {'deckhand:show_revision': 'rule:admin_api',
|
||||
'deckhand:create_cleartext_documents': '@'}
|
||||
self.policy.set_rules(rules)
|
||||
|
||||
# Create a bucket with a document to generate a revision_id.
|
||||
secrets_factory = factories.DocumentSecretFactory()
|
||||
payload = [secrets_factory.gen_test('Certificate', 'cleartext')]
|
||||
|
||||
resp = self.app.simulate_put(
|
||||
'/api/v1.0/buckets/mop/documents',
|
||||
headers={'Content-Type': 'application/x-yaml'},
|
||||
body=yaml.safe_dump_all(payload))
|
||||
self.assertEqual(200, resp.status_code)
|
||||
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
|
||||
'revision']
|
||||
|
||||
# Then try to query the revision "show" endpoint with insufficient
|
||||
# permissions.
|
||||
resp = self.app.simulate_get(
|
||||
'/api/v1.0/revisions/%s' % revision_id,
|
||||
headers={'Content-Type': 'application/x-yaml'})
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
def test_delete_revisions_except_forbidden(self):
|
||||
rules = {'deckhand:delete_revisions': 'rule:admin_api'}
|
||||
self.policy.set_rules(rules)
|
||||
|
||||
resp = self.app.simulate_delete(
|
||||
'/api/v1.0/revisions',
|
||||
headers={'Content-Type': 'application/x-yaml'})
|
||||
self.assertEqual(403, resp.status_code)
|
|
@ -0,0 +1,30 @@
|
|||
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from deckhand.tests.unit.control import base as test_base
|
||||
|
||||
|
||||
class TestRevisionsDiffControllerNegativeRBAC(test_base.BaseControllerTest):
|
||||
"""Test suite for validating negative RBAC scenarios for revisions diff
|
||||
controller.
|
||||
"""
|
||||
|
||||
def test_show_revision_diff_except_forbidden(self):
|
||||
rules = {'deckhand:show_revision_diff': 'rule:admin_api'}
|
||||
self.policy.set_rules(rules)
|
||||
|
||||
resp = self.app.simulate_get(
|
||||
'/api/v1.0/revisions/0/diff/0',
|
||||
headers={'Content-Type': 'application/x-yaml'})
|
||||
self.assertEqual(403, resp.status_code)
|
|
@ -0,0 +1,81 @@
|
|||
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import yaml
|
||||
|
||||
import mock
|
||||
|
||||
from deckhand.control import buckets
|
||||
from deckhand import factories
|
||||
from deckhand.tests.unit.control import base as test_base
|
||||
|
||||
|
||||
class TestRevisionsRollbackControllerNegativeRBAC(
|
||||
test_base.BaseControllerTest):
|
||||
"""Test suite for validating negative RBAC scenarios for revisions rollback
|
||||
controller.
|
||||
"""
|
||||
|
||||
def test_revision_rollback_cleartext_except_forbidden(self):
|
||||
rules = {'deckhand:create_cleartext_documents': '@'}
|
||||
self.policy.set_rules(rules)
|
||||
|
||||
# Create a revision so we have something to roll back to.
|
||||
secrets_factory = factories.DocumentSecretFactory()
|
||||
payload = [secrets_factory.gen_test('Certificate', 'cleartext')]
|
||||
resp = self.app.simulate_put(
|
||||
'/api/v1.0/buckets/mop/documents',
|
||||
headers={'Content-Type': 'application/x-yaml'},
|
||||
body=yaml.safe_dump_all(payload))
|
||||
self.assertEqual(200, resp.status_code)
|
||||
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
|
||||
'revision']
|
||||
|
||||
rules = {'deckhand:create_cleartext_documents': 'rule:admin_api'}
|
||||
self.policy.set_rules(rules)
|
||||
|
||||
resp = self.app.simulate_post(
|
||||
'/api/v1.0/rollback/%s' % revision_id,
|
||||
headers={'Content-Type': 'application/x-yaml'})
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
def test_revision_rollback_encrypted_except_forbidden(self):
|
||||
rules = {'deckhand:create_encrypted_documents': '@',
|
||||
'deckhand:create_cleartext_documents': '@'}
|
||||
self.policy.set_rules(rules)
|
||||
|
||||
# Create a revision so we have something to roll back to.
|
||||
secrets_factory = factories.DocumentSecretFactory()
|
||||
payload = [secrets_factory.gen_test('Certificate', 'encrypted')]
|
||||
|
||||
with mock.patch.object(buckets.BucketsResource, 'secrets_mgr',
|
||||
autospec=True) as mock_secrets_mgr:
|
||||
mock_secrets_mgr.create.return_value = {
|
||||
'secret': payload[0]['data']}
|
||||
resp = self.app.simulate_put(
|
||||
'/api/v1.0/buckets/mop/documents',
|
||||
headers={'Content-Type': 'application/x-yaml'},
|
||||
body=yaml.safe_dump_all(payload))
|
||||
self.assertEqual(200, resp.status_code)
|
||||
revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
|
||||
'revision']
|
||||
|
||||
rules = {'deckhand:create_encrypted_documents': 'rule:admin_api',
|
||||
'deckhand:create_cleartext_documents': '@'}
|
||||
self.policy.set_rules(rules)
|
||||
|
||||
resp = self.app.simulate_post(
|
||||
'/api/v1.0/rollback/%s' % revision_id,
|
||||
headers={'Content-Type': 'application/x-yaml'})
|
||||
self.assertEqual(403, resp.status_code)
|
|
@ -512,7 +512,6 @@ class TestValidationsController(test_base.BaseControllerTest):
|
|||
depends on substitution from another document.
|
||||
"""
|
||||
rules = {'deckhand:create_cleartext_documents': '@',
|
||||
'deckhand:create_validation': '@',
|
||||
'deckhand:list_validations': '@'}
|
||||
self.policy.set_rules(rules)
|
||||
|
||||
|
|
|
@ -147,52 +147,55 @@ class TestDocuments(base.TestDbBase):
|
|||
rand_secret = {'secret': test_utils.rand_password()}
|
||||
bucket_name = test_utils.rand_name('bucket')
|
||||
|
||||
for storage_policy in ('encrypted', 'cleartext'):
|
||||
for expected_len, storage_policy in enumerate(
|
||||
('encrypted', 'cleartext')):
|
||||
secret_doc_payload = self.secrets_factory.gen_test(
|
||||
'Certificate', storage_policy, rand_secret)
|
||||
created_documents = self.create_documents(
|
||||
bucket_name, secret_doc_payload)
|
||||
|
||||
self.assertEqual(1, len(created_documents))
|
||||
self.assertIn('Certificate', created_documents[0]['schema'])
|
||||
self.assertEqual(storage_policy, created_documents[0][
|
||||
self.assertEqual(expected_len + 1, len(created_documents))
|
||||
self.assertIn('Certificate', created_documents[-1]['schema'])
|
||||
self.assertEqual(storage_policy, created_documents[-1][
|
||||
'metadata']['storagePolicy'])
|
||||
self.assertTrue(created_documents[0]['is_secret'])
|
||||
self.assertEqual(rand_secret, created_documents[0]['data'])
|
||||
self.assertTrue(created_documents[-1]['is_secret'])
|
||||
self.assertEqual(rand_secret, created_documents[-1]['data'])
|
||||
|
||||
def test_create_certificate_key(self):
|
||||
rand_secret = {'secret': test_utils.rand_password()}
|
||||
bucket_name = test_utils.rand_name('bucket')
|
||||
|
||||
for storage_policy in ('encrypted', 'cleartext'):
|
||||
for expected_len, storage_policy in enumerate(
|
||||
('encrypted', 'cleartext')):
|
||||
secret_doc_payload = self.secrets_factory.gen_test(
|
||||
'CertificateKey', storage_policy, rand_secret)
|
||||
created_documents = self.create_documents(
|
||||
bucket_name, secret_doc_payload)
|
||||
|
||||
self.assertEqual(1, len(created_documents))
|
||||
self.assertIn('CertificateKey', created_documents[0]['schema'])
|
||||
self.assertEqual(storage_policy, created_documents[0][
|
||||
self.assertEqual(expected_len + 1, len(created_documents))
|
||||
self.assertIn('CertificateKey', created_documents[-1]['schema'])
|
||||
self.assertEqual(storage_policy, created_documents[-1][
|
||||
'metadata']['storagePolicy'])
|
||||
self.assertTrue(created_documents[0]['is_secret'])
|
||||
self.assertEqual(rand_secret, created_documents[0]['data'])
|
||||
self.assertTrue(created_documents[-1]['is_secret'])
|
||||
self.assertEqual(rand_secret, created_documents[-1]['data'])
|
||||
|
||||
def test_create_passphrase(self):
|
||||
rand_secret = {'secret': test_utils.rand_password()}
|
||||
bucket_name = test_utils.rand_name('bucket')
|
||||
|
||||
for storage_policy in ('encrypted', 'cleartext'):
|
||||
for expected_len, storage_policy in enumerate(
|
||||
('encrypted', 'cleartext')):
|
||||
secret_doc_payload = self.secrets_factory.gen_test(
|
||||
'Passphrase', storage_policy, rand_secret)
|
||||
created_documents = self.create_documents(
|
||||
bucket_name, secret_doc_payload)
|
||||
|
||||
self.assertEqual(1, len(created_documents))
|
||||
self.assertIn('Passphrase', created_documents[0]['schema'])
|
||||
self.assertEqual(storage_policy, created_documents[0][
|
||||
self.assertEqual(expected_len + 1, len(created_documents))
|
||||
self.assertIn('Passphrase', created_documents[-1]['schema'])
|
||||
self.assertEqual(storage_policy, created_documents[-1][
|
||||
'metadata']['storagePolicy'])
|
||||
self.assertTrue(created_documents[0]['is_secret'])
|
||||
self.assertEqual(rand_secret, created_documents[0]['data'])
|
||||
self.assertTrue(created_documents[-1]['is_secret'])
|
||||
self.assertEqual(rand_secret, created_documents[-1]['data'])
|
||||
|
||||
def test_delete_document(self):
|
||||
payload = base.DocumentFixture.get_minimal_fixture()
|
||||
|
|
|
@ -19,6 +19,7 @@ import os
|
|||
import yaml
|
||||
|
||||
import fixtures
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_policy import opts as policy_opts
|
||||
from oslo_policy import policy as oslo_policy
|
||||
|
@ -84,6 +85,51 @@ class RealPolicyFixture(fixtures.Fixture):
|
|||
deckhand.policy.reset()
|
||||
deckhand.policy.init()
|
||||
self.addCleanup(deckhand.policy.reset)
|
||||
self._install_policy_verification_hook()
|
||||
|
||||
def _install_policy_verification_hook(self):
|
||||
"""Install policy verification hook for validating RBAC.
|
||||
|
||||
This function's purpose is to guarantee that policy enforcement is
|
||||
happening the way we expect it to. It validates that the policies
|
||||
that are passed to ``self.policy.set_rules`` from within a test that
|
||||
uses this fixture is a subset of the actual policies that are enforced
|
||||
by Deckhand controllers.
|
||||
|
||||
The algorithm is as follows:
|
||||
|
||||
1) Initialize list of actual policy actions to remember.
|
||||
2) Initialize list of expected policy actions to remember.
|
||||
3) Reference a pre-mocked copy of the policy enforcement function
|
||||
that is ultimately called by Deckhand for policy enforcement.
|
||||
4a) Create a hook that stores the actual policy for later.
|
||||
4b) The hook then calls the *real* policy enforcement function
|
||||
using the reference from step 3).
|
||||
5) Mock the policy enforcement function and have it instead call
|
||||
our hook from step 4a).
|
||||
6) Add a clean up to undo the mock from step 5).
|
||||
|
||||
There is a tight coupling between this function and ``set_rules``
|
||||
below.
|
||||
|
||||
The comparison between ``self.expected_policy_actions`` and
|
||||
``self.actual_policy_actions`` should be done in the ``tearDown``
|
||||
function of the class that uses this fixture.
|
||||
"""
|
||||
self.actual_policy_actions = []
|
||||
self.expected_policy_actions = []
|
||||
_do_enforce_rbac = deckhand.policy._do_enforce_rbac
|
||||
|
||||
def enforce_policy_and_remember_actual_rules(
|
||||
action, *a, **k):
|
||||
self.actual_policy_actions.append(action)
|
||||
_do_enforce_rbac(action, *a, **k)
|
||||
|
||||
mock_do_enforce_rbac = mock.patch.object(
|
||||
deckhand.policy, '_do_enforce_rbac', autospec=True).start()
|
||||
mock_do_enforce_rbac.side_effect = (
|
||||
enforce_policy_and_remember_actual_rules)
|
||||
self.addCleanup(mock.patch.stopall)
|
||||
|
||||
def add_missing_default_rules(self, rules):
|
||||
"""Adds default rules and their values to the given rules dict.
|
||||
|
@ -97,8 +143,21 @@ class RealPolicyFixture(fixtures.Fixture):
|
|||
rules[rule.name] = rule.check_str
|
||||
|
||||
def set_rules(self, rules, overwrite=True):
|
||||
"""Set the custom policy rules to override.
|
||||
|
||||
:param dict rules: Dictionary keyed with policy actions enforced
|
||||
by Deckhand whose values are a custom rule understood by
|
||||
``oslo.policy`` library.
|
||||
|
||||
This function overrides the default policy rules with the custom rules
|
||||
specified by ``rules``. The ``rules`` passed here are added to
|
||||
``self.expected_policy_actions`` for later comparison with
|
||||
``self.actual_policy_actions``.
|
||||
"""
|
||||
if isinstance(rules, dict):
|
||||
rules = oslo_policy.Rules.from_dict(rules)
|
||||
|
||||
self.expected_policy_actions.extend(rules)
|
||||
|
||||
policy = deckhand.policy._ENFORCER
|
||||
policy.set_rules(rules, overwrite=overwrite)
|
||||
|
|
Loading…
Reference in New Issue