diff --git a/pegleg/engine/catalogs/passphrase_catalog.py b/pegleg/engine/catalogs/passphrase_catalog.py index 6b3231ea..906271cb 100644 --- a/pegleg/engine/catalogs/passphrase_catalog.py +++ b/pegleg/engine/catalogs/passphrase_catalog.py @@ -15,7 +15,7 @@ import logging from pegleg.engine.catalogs.base_catalog import BaseCatalog -from pegleg.engine.exceptions import PassphraseCatalogNotFoundException +from pegleg.engine import exceptions LOG = logging.getLogger(__name__) KIND = 'PassphraseCatalog' @@ -24,10 +24,16 @@ P_LENGTH = 'length' P_DESCRIPTION = 'description' P_ENCRYPTED = 'encrypted' P_CLEARTEXT = 'cleartext' -P_ENCODING = 'encoding' +P_TYPE = 'type' +P_REGENERABLE = 'regenerable' +P_PROMPT = 'prompt' P_DEFAULT_LENGTH = 24 P_DEFAULT_STORAGE_POLICY = 'encrypted' -P_DEFAULT_ENCODING = 'none' +P_DEFAULT_TYPE = 'passphrase' +P_DEFAULT_REGENERABLE = True +P_DEFAULT_PROMPT = False +VALID_PASSPHRASE_TYPES = ['passphrase', 'base64', 'uuid'] +VALID_BOOLEAN_FIELDS = [True, False] __all__ = ['PassphraseCatalog'] @@ -51,7 +57,7 @@ class PassphraseCatalog(BaseCatalog): """ super(PassphraseCatalog, self).__init__(KIND, sitename, documents) if not self._catalog_docs: - raise PassphraseCatalogNotFoundException() + raise exceptions.PassphraseCatalogNotFoundException() @property def get_passphrase_names(self): @@ -88,17 +94,78 @@ class PassphraseCatalog(BaseCatalog): else: return P_DEFAULT_STORAGE_POLICY - def get_encoding_method(self, passphrase_name): - """Return the encoding method of the ``passphrase_name``. + def get_passphrase_type(self, passphrase_name): + """Return the type of the ``passphrase_name``. - If the catalog does not specify an encoding method for the - ``passphrase_name``, return the default encoding method, 'none'. - :param str passphrase_name: The name of the passphrase to evaluate. - :returns: The encoding method to be used for ``passphrase_name``. - :rtype: str + Determine what type of secret this passphrase name is. Valid options: + 1. passphrase (a randomly generated passphrase) + 2. base64 (a randomly generated passphrase, encoded with base64) + 3. uuid (a randomly generated UUID) + + If an invalid option is specified, raise an exception. If a valid + option is specified, return it. If no option is specified, default to + passphrase. """ for c_doc in self._catalog_docs: for passphrase in c_doc['data']['passphrases']: if passphrase[P_DOCUMENT_NAME] == passphrase_name: - return passphrase.get(P_ENCODING, P_DEFAULT_ENCODING) + passphrase_type = passphrase.get(P_TYPE, + P_DEFAULT_TYPE).lower() + if passphrase_type not in VALID_PASSPHRASE_TYPES: + raise exceptions.InvalidPassphraseType( + ptype=passphrase_type, + pname=passphrase_name, + validvalues=VALID_PASSPHRASE_TYPES) + else: + return passphrase_type + + def is_passphrase_regenerable(self, passphrase_name): + """Return the regenerable field of the ``passphrase_name``. + + Determines if this passphrase name is regenerable. + Valid options: True, False. + If no option is specified, default to True. If an invalid option is + specified, raise an exception + + """ + # UUIDs should not be regenerated + if self.get_passphrase_type(passphrase_name) == 'uuid': + return False + + # All other types can be regenerated + for c_doc in self._catalog_docs: + for passphrase in c_doc['data']['passphrases']: + if passphrase[P_DOCUMENT_NAME] == passphrase_name: + passphrase_regenerable = passphrase.get( + P_REGENERABLE, P_DEFAULT_REGENERABLE) + if passphrase_regenerable not in VALID_BOOLEAN_FIELDS: + raise exceptions.InvalidPassphraseRegeneration( + pregen=passphrase_regenerable, + pname=passphrase_name, + validvalues=VALID_BOOLEAN_FIELDS) + else: + return passphrase_regenerable + + def is_passphrase_prompt(self, passphrase_name): + """Return the prompt field of the ``passphrase_name``. + + Determines if this passphrase name should be generated interactively. + Valid options: True, False. + If no option is specified, default to False. If an invalid option is + specified, raise an exception + + """ + + for c_doc in self._catalog_docs: + for passphrase in c_doc['data']['passphrases']: + if passphrase[P_DOCUMENT_NAME] == passphrase_name: + passphrase_prompt = passphrase.get( + P_PROMPT, P_DEFAULT_PROMPT) + if passphrase_prompt not in VALID_BOOLEAN_FIELDS: + raise exceptions.InvalidPassphrasePrompt( + pprompt=passphrase_prompt, + pname=passphrase_name, + validvalues=VALID_BOOLEAN_FIELDS) + else: + return passphrase_prompt diff --git a/pegleg/engine/exceptions.py b/pegleg/engine/exceptions.py index 4cd96477..4b0a0598 100644 --- a/pegleg/engine/exceptions.py +++ b/pegleg/engine/exceptions.py @@ -92,6 +92,27 @@ class PassphraseCatalogNotFoundException(PeglegBaseException): 'the site Passphrases!') +class InvalidPassphraseType(PeglegBaseException): + """Invalid Passphrase type""" + message = ( + 'Invalid Passphrase type %(ptype)s specified for %(pname)s. Valid ' + 'values are: %(validvalues)s.') + + +class InvalidPassphrasePrompt(PeglegBaseException): + """Invalid Passphrase prompt field""" + message = ( + 'Invalid Passphrase prompt %(pprompt)s specified for %(pname)s. Valid ' + 'values are: %(validvalues)s.') + + +class InvalidPassphraseRegeneration(PeglegBaseException): + """Invalid Regenerable value for entry in passphrase-catalog""" + message = ( + 'Invalid Regenerable value %(pregen)s specified for %(pname)s. Valid ' + 'values are: %(validvalues)s.') + + class GenesisBundleEncryptionException(PeglegBaseException): """Exception raised when encryption of the genesis bundle fails.""" diff --git a/pegleg/engine/generators/passphrase_generator.py b/pegleg/engine/generators/passphrase_generator.py index e84c2302..55d947f3 100644 --- a/pegleg/engine/generators/passphrase_generator.py +++ b/pegleg/engine/generators/passphrase_generator.py @@ -15,6 +15,11 @@ import base64 from getpass import getpass import logging +import os +import re + +import click +from oslo_utils import uuidutils from pegleg.engine.catalogs import passphrase_catalog from pegleg.engine.catalogs.passphrase_catalog import PassphraseCatalog @@ -64,19 +69,45 @@ class PassphraseGenerator(BaseGenerator): :param bool force_cleartext: If true, don't encrypt """ for p_name in self._catalog.get_passphrase_names: + # Check if this secret is present and should not be regenerated + save_path = self.get_save_path(p_name) + regenerable = self._catalog.is_passphrase_regenerable(p_name) + if os.path.exists(save_path) and not regenerable: + continue + + # Generate secret as it either does not exist yet or is a + # regenerable secret and does exist but should be rotated. passphrase = None - if interactive: - passphrase = getpass( - prompt="Input passphrase for {}. Leave blank to " - "auto-generate:\n".format(p_name)) + passphrase_type = self._catalog.get_passphrase_type(p_name) + prompt = self._catalog.is_passphrase_prompt(p_name) + if interactive or prompt: + passphrase = self.get_interactive_pass(p_name) + + if passphrase_type == 'uuid': # nosec + validated = uuidutils.is_uuid_like(passphrase) + while passphrase and not validated: + click.echo('Passphrase {} is not a valid uuid.') + passphrase = self.get_interactive_pass(p_name) + validated = uuidutils.is_uuid_like(passphrase) + + elif passphrase_type == 'base64': # nosec + validated = self.is_base64_like(passphrase) + while passphrase and not validated: + click.echo('Passphrase {} is not base64 like.') + passphrase = self.get_interactive_pass(p_name) + validated = self.is_base64_like(passphrase) + if not passphrase: - passphrase = self._pass_util.get_crypto_string( - self._catalog.get_length(p_name)) - encoding_method = self._catalog.get_encoding_method(p_name) - if encoding_method == 'base64': - # Convert string to bytes, then encode in base64 - passphrase = passphrase.encode() - passphrase = base64.b64encode(passphrase) + if passphrase_type == 'uuid': # nosec + passphrase = uuidutils.generate_uuid() + else: + passphrase = self._pass_util.get_crypto_string( + self._catalog.get_length(p_name)) + if passphrase_type == 'base64': # nosec + # Take the randomly generated string and convert to a + # random base64 string + passphrase = passphrase.encode() + passphrase = base64.b64encode(passphrase).decode() docs = list() if force_cleartext: storage_policy = passphrase_catalog.P_CLEARTEXT @@ -88,7 +119,6 @@ class PassphraseGenerator(BaseGenerator): docs.append( self.generate_doc(KIND, p_name, storage_policy, passphrase)) - save_path = self.get_save_path(p_name) if storage_policy == passphrase_catalog.P_ENCRYPTED: PeglegSecretManagement( docs=docs, @@ -98,6 +128,23 @@ class PassphraseGenerator(BaseGenerator): else: files.write(docs, save_path) + def get_interactive_pass(self, p_name): + passphrase = getpass( + prompt="Input passphrase/UUID for {}. Leave blank to " + "auto-generate:\n".format(p_name)) + return passphrase + + def is_base64_like(self, passphrase): + pattern = re.compile( + "^([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{4}|[A-Za-z0-9+" + "/]{3}=|[A-Za-z0-9+/]{2}==)$") + if not passphrase or len(passphrase) < 1: + return False + elif pattern.match(passphrase): + return True + else: + return False + @property def kind_path(self): return KIND_PATH diff --git a/requirements.txt b/requirements.txt index da006beb..437572e4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,7 +7,8 @@ python-dateutil==2.7.3 docker==3.7.2 requests==2.20.0 urllib3==1.24.3 -chardet==3.0.4 +chardet==3.0.4 +oslo.utils==3.41.0 # External dependencies git+https://opendev.org/airship/deckhand.git@134c55805b13b2d3f430a7c0fee840990c55c0aa diff --git a/tests/unit/engine/test_generate_passphrases.py b/tests/unit/engine/test_generate_passphrases.py index 35ce19dd..22b0a41a 100644 --- a/tests/unit/engine/test_generate_passphrases.py +++ b/tests/unit/engine/test_generate_passphrases.py @@ -87,7 +87,7 @@ data: ... """) -TEST_BASE64_PASSPHRASES_CATALOG = yaml.safe_load( +TEST_TYPES_CATALOG = yaml.safe_load( """ --- schema: pegleg/PassphraseCatalog/v1 @@ -103,13 +103,18 @@ data: - description: 'description of base64 required passphrases' document_name: base64_encoded_passphrase_doc encrypted: true - encoding: base64 - - description: 'description of not base64 encoded passphrases' - document_name: not_encoded + type: base64 + - description: 'description of uuid secret' + document_name: uuid_passphrase_doc encrypted: true encoding: none - - description: 'description of not base64 encoded passphrases' - document_name: also_not_encoded + type: uuid + - description: 'description of random passphrase' + document_name: passphrase_doc + encrypted: true + type: passphrase + - description: 'description of default random passphrase' + document_name: default_passphrase_doc encrypted: true ... """) @@ -150,9 +155,7 @@ TEST_SITE_DOCUMENTS = [TEST_SITE_DEFINITION, TEST_PASSPHRASES_CATALOG] TEST_GLOBAL_SITE_DOCUMENTS = [ TEST_SITE_DEFINITION, TEST_GLOBAL_PASSPHRASES_CATALOG ] -TEST_BASE64_SITE_DOCUMENTS = [ - TEST_SITE_DEFINITION, TEST_BASE64_PASSPHRASES_CATALOG -] +TEST_TYPE_SITE_DOCUMENTS = [TEST_SITE_DEFINITION, TEST_TYPES_CATALOG] @mock.patch.object( @@ -289,7 +292,7 @@ def test_global_passphrase_catalog(*_): util.definition, 'documents_for_site', autospec=True, - return_value=TEST_BASE64_SITE_DOCUMENTS) + return_value=TEST_TYPE_SITE_DOCUMENTS) @mock.patch.object( pegleg.config, 'get_site_repo', @@ -307,12 +310,12 @@ def test_global_passphrase_catalog(*_): 'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC', 'PEGLEG_SALT': 'MySecretSalt1234567890][' }) -def test_base64_passphrase_catalog(*_): +def test_uuid_passphrase_catalog(*_): _dir = tempfile.mkdtemp() os.makedirs(os.path.join(_dir, 'cicd_site_repo'), exist_ok=True) PassphraseGenerator('cicd', _dir, 'test_author').generate() - for passphrase in TEST_BASE64_PASSPHRASES_CATALOG['data']['passphrases']: + for passphrase in TEST_TYPES_CATALOG['data']['passphrases']: passphrase_file_name = '{}.yaml'.format(passphrase['document_name']) passphrase_file_path = os.path.join( _dir, 'site', 'cicd', 'secrets', 'passphrases', @@ -324,28 +327,5 @@ def test_base64_passphrase_catalog(*_): doc['data']['managedDocument']['data'], os.environ['PEGLEG_PASSPHRASE'].encode(), os.environ['PEGLEG_SALT'].encode()) - if passphrase_file_name == "base64_encoded_passphrase_doc.yaml": - assert decrypted_passphrase == base64.b64encode( - base64.b64decode(decrypted_passphrase)) - - -@mock.patch.dict( - os.environ, { - 'PEGLEG_PASSPHRASE': 'ytrr89erARAiPE34692iwUMvWqqBvC', - 'PEGLEG_SALT': 'MySecretSalt1234567890][' - }) -def test_crypt_coding_flow(): - cs_util = CryptoString() - orig_passphrase = cs_util.get_crypto_string() - bytes_passphrase = orig_passphrase.encode() - b64_passphrase = base64.b64encode(bytes_passphrase) - encrypted = encryption.encrypt( - b64_passphrase, os.environ['PEGLEG_PASSPHRASE'].encode(), - os.environ['PEGLEG_SALT'].encode()) - decrypted = encryption.decrypt( - encrypted, os.environ['PEGLEG_PASSPHRASE'].encode(), - os.environ['PEGLEG_SALT'].encode()) - assert encrypted != decrypted - assert decrypted == b64_passphrase - assert base64.b64decode(decrypted) == bytes_passphrase - assert bytes_passphrase.decode() == orig_passphrase + if passphrase_file_name == "uuid_passphrase_doc.yaml": + assert uuid.UUID(decrypted_passphrase.decode()).version == 4