Search all scopes for catalogs
This patch revises pegleg to check all scopes for catalog files, rather than just the site scope. Change-Id: Icf03ce739968ef3ed5fd218ca0fe87e24451ba0d
This commit is contained in:
parent
81720cd0c8
commit
0252b71750
|
@ -14,7 +14,6 @@
|
||||||
|
|
||||||
from abc import ABC
|
from abc import ABC
|
||||||
import logging
|
import logging
|
||||||
import os
|
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from pegleg import config
|
from pegleg import config
|
||||||
|
@ -43,7 +42,7 @@ class BaseCatalog(ABC):
|
||||||
"""
|
"""
|
||||||
self._documents = documents or definition.documents_for_site(sitename)
|
self._documents = documents or definition.documents_for_site(sitename)
|
||||||
self._site_name = sitename
|
self._site_name = sitename
|
||||||
self._catalog_path = None
|
self._catalog_path = []
|
||||||
self._kind = kind
|
self._kind = kind
|
||||||
self._catalog_docs = list()
|
self._catalog_docs = list()
|
||||||
for document in self._documents:
|
for document in self._documents:
|
||||||
|
@ -61,7 +60,7 @@ class BaseCatalog(ABC):
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def catalog_path(self):
|
def catalog_path(self):
|
||||||
if self._catalog_path is None:
|
if not self._catalog_path:
|
||||||
self._set_catalog_path()
|
self._set_catalog_path()
|
||||||
return self._catalog_path
|
return self._catalog_path
|
||||||
|
|
||||||
|
@ -69,15 +68,14 @@ class BaseCatalog(ABC):
|
||||||
repo_name = git.repo_url(config.get_site_repo())
|
repo_name = git.repo_url(config.get_site_repo())
|
||||||
catalog_name = self._get_document_name('{}.yaml'.format(self._kind))
|
catalog_name = self._get_document_name('{}.yaml'.format(self._kind))
|
||||||
for file_path in definition.site_files(self.site_name):
|
for file_path in definition.site_files(self.site_name):
|
||||||
if file_path.endswith(catalog_name) and repo_name in file_path:
|
if file_path.endswith(catalog_name):
|
||||||
self._catalog_path = os.path.join(
|
self._catalog_path.append(file_path)
|
||||||
repo_name, file_path.split(repo_name)[1].lstrip('/'))
|
if not self._catalog_path:
|
||||||
return
|
# Cound not find the Catalog for this generated passphrase
|
||||||
# Cound not find the Catalog for this generated passphrase
|
# raise an exception.
|
||||||
# raise an exception.
|
LOG.error('Catalog path: {} was not found in repo: {}'.format(
|
||||||
LOG.error('Catalog path: {} was not found in repo: {}'.format(
|
catalog_name, repo_name))
|
||||||
catalog_name, repo_name))
|
raise PassphraseCatalogNotFoundException()
|
||||||
raise PassphraseCatalogNotFoundException()
|
|
||||||
|
|
||||||
def _get_document_name(self, name):
|
def _get_document_name(self, name):
|
||||||
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1-\2', name)
|
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1-\2', name)
|
||||||
|
|
|
@ -69,6 +69,24 @@ data:
|
||||||
...
|
...
|
||||||
""")
|
""")
|
||||||
|
|
||||||
|
TEST_GLOBAL_PASSPHRASES_CATALOG = yaml.load("""
|
||||||
|
---
|
||||||
|
schema: pegleg/PassphraseCatalog/v1
|
||||||
|
metadata:
|
||||||
|
schema: metadata/Document/v1
|
||||||
|
name: cluster-passphrases
|
||||||
|
layeringDefinition:
|
||||||
|
abstract: false
|
||||||
|
layer: global
|
||||||
|
storagePolicy: cleartext
|
||||||
|
data:
|
||||||
|
passphrases:
|
||||||
|
- description: 'description of passphrase from global'
|
||||||
|
document_name: passphrase_from_global
|
||||||
|
encrypted: true
|
||||||
|
...
|
||||||
|
""")
|
||||||
|
|
||||||
TEST_REPOSITORIES = {
|
TEST_REPOSITORIES = {
|
||||||
'repositories': {
|
'repositories': {
|
||||||
'global': {
|
'global': {
|
||||||
|
@ -101,6 +119,7 @@ TEST_SITE_DEFINITION = {
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_SITE_DOCUMENTS = [TEST_SITE_DEFINITION, TEST_PASSPHRASES_CATALOG]
|
TEST_SITE_DOCUMENTS = [TEST_SITE_DEFINITION, TEST_PASSPHRASES_CATALOG]
|
||||||
|
TEST_GLOBAL_SITE_DOCUMENTS = [TEST_SITE_DEFINITION, TEST_GLOBAL_PASSPHRASES_CATALOG]
|
||||||
|
|
||||||
|
|
||||||
def test_cryptostring_default_len():
|
def test_cryptostring_default_len():
|
||||||
|
@ -149,13 +168,13 @@ def test_cryptostring_long_len():
|
||||||
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC',
|
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC',
|
||||||
ENV_SALT: 'MySecretSalt'})
|
ENV_SALT: 'MySecretSalt'})
|
||||||
def test_generate_passphrases(*_):
|
def test_generate_passphrases(*_):
|
||||||
dir = tempfile.mkdtemp()
|
_dir = tempfile.mkdtemp()
|
||||||
os.makedirs(os.path.join(dir, 'cicd_site_repo'), exist_ok=True)
|
os.makedirs(os.path.join(_dir, 'cicd_site_repo'), exist_ok=True)
|
||||||
PassphraseGenerator('cicd', dir, 'test_author').generate()
|
PassphraseGenerator('cicd', _dir, 'test_author').generate()
|
||||||
|
|
||||||
for passphrase in TEST_PASSPHRASES_CATALOG['data']['passphrases']:
|
for passphrase in TEST_PASSPHRASES_CATALOG['data']['passphrases']:
|
||||||
passphrase_file_name = '{}.yaml'.format(passphrase['document_name'])
|
passphrase_file_name = '{}.yaml'.format(passphrase['document_name'])
|
||||||
passphrase_file_path = os.path.join(dir, 'site', 'cicd', 'secrets',
|
passphrase_file_path = os.path.join(_dir, 'site', 'cicd', 'secrets',
|
||||||
'passphrases',
|
'passphrases',
|
||||||
passphrase_file_name)
|
passphrase_file_name)
|
||||||
assert os.path.isfile(passphrase_file_path)
|
assert os.path.isfile(passphrase_file_path)
|
||||||
|
@ -200,3 +219,52 @@ def test_generate_passphrases_exception(capture):
|
||||||
('Signature verification to decrypt secrets failed. '
|
('Signature verification to decrypt secrets failed. '
|
||||||
'Please check your provided passphrase and salt and '
|
'Please check your provided passphrase and salt and '
|
||||||
'try again.')))
|
'try again.')))
|
||||||
|
|
||||||
|
|
||||||
|
@mock.patch.object(
|
||||||
|
util.definition,
|
||||||
|
'documents_for_site',
|
||||||
|
autospec=True,
|
||||||
|
return_value=TEST_GLOBAL_SITE_DOCUMENTS)
|
||||||
|
@mock.patch.object(
|
||||||
|
pegleg.config,
|
||||||
|
'get_site_repo',
|
||||||
|
autospec=True,
|
||||||
|
return_value='cicd_site_repo')
|
||||||
|
@mock.patch.object(
|
||||||
|
util.definition,
|
||||||
|
'site_files',
|
||||||
|
autospec=True,
|
||||||
|
return_value=[
|
||||||
|
'cicd_global_repo/site/cicd/passphrases/passphrase-catalog.yaml', ])
|
||||||
|
@mock.patch.dict(os.environ, {
|
||||||
|
ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC',
|
||||||
|
ENV_SALT: 'MySecretSalt'})
|
||||||
|
def test_global_passphrase_catalog(*_):
|
||||||
|
_dir = tempfile.mkdtemp()
|
||||||
|
os.makedirs(os.path.join(_dir, 'cicd_site_repo'), exist_ok=True)
|
||||||
|
PassphraseGenerator('cicd', _dir, 'test_author').generate()
|
||||||
|
|
||||||
|
for passphrase in TEST_GLOBAL_PASSPHRASES_CATALOG['data']['passphrases']:
|
||||||
|
passphrase_file_name = '{}.yaml'.format(passphrase['document_name'])
|
||||||
|
passphrase_file_path = os.path.join(_dir, 'site', 'cicd', 'secrets',
|
||||||
|
'passphrases',
|
||||||
|
passphrase_file_name)
|
||||||
|
assert os.path.isfile(passphrase_file_path)
|
||||||
|
with open(passphrase_file_path) as stream:
|
||||||
|
doc = yaml.safe_load(stream)
|
||||||
|
assert doc['schema'] == 'pegleg/PeglegManagedDocument/v1'
|
||||||
|
assert doc['metadata']['storagePolicy'] == 'cleartext'
|
||||||
|
assert 'encrypted' in doc['data']
|
||||||
|
assert doc['data']['encrypted']['by'] == 'test_author'
|
||||||
|
assert 'generated' in doc['data']
|
||||||
|
assert doc['data']['generated']['by'] == 'test_author'
|
||||||
|
assert 'managedDocument' in doc['data']
|
||||||
|
assert doc['data']['managedDocument']['metadata'][
|
||||||
|
'storagePolicy'] == 'encrypted'
|
||||||
|
decrypted_passphrase = encryption.decrypt(
|
||||||
|
doc['data']['managedDocument']['data'],
|
||||||
|
os.environ['PEGLEG_PASSPHRASE'].encode(),
|
||||||
|
os.environ['PEGLEG_SALT'].encode())
|
||||||
|
if passphrase_file_name == "passphrase_from_global.yaml":
|
||||||
|
assert len(decrypted_passphrase) == 24
|
||||||
|
|
Loading…
Reference in New Issue