diff --git a/pegleg/engine/catalogs/passphrase_catalog.py b/pegleg/engine/catalogs/passphrase_catalog.py index 0e1e4725..44129352 100644 --- a/pegleg/engine/catalogs/passphrase_catalog.py +++ b/pegleg/engine/catalogs/passphrase_catalog.py @@ -24,8 +24,10 @@ P_LENGTH = 'length' P_DESCRIPTION = 'description' P_ENCRYPTED = 'encrypted' P_CLEARTEXT = 'cleartext' +P_ENCODING = 'encoding' P_DEFAULT_LENGTH = 24 P_DEFAULT_STORAGE_POLICY = 'encrypted' +P_DEFAULT_ENCODING = 'none' __all__ = ['PassphraseCatalog'] @@ -86,3 +88,18 @@ class PassphraseCatalog(BaseCatalog): return P_CLEARTEXT else: return P_DEFAULT_STORAGE_POLICY + + def get_encoding_method(self, passphrase_name): + """Return the encoding method of the ``passphrase_name``. + + If the catalog does not specify an encoding method for the + ``passphrase_name``, return the default encoding method, 'none'. + :param str passphrase_name: The name of the passphrase to evaluate. + :returns: The encoding method to be used for ``passphrase_name``. + :rtype: str + """ + + for c_doc in self._catalog_docs: + for passphrase in c_doc['data']['passphrases']: + if passphrase[P_DOCUMENT_NAME] == passphrase_name: + return passphrase.get(P_ENCODING, P_DEFAULT_ENCODING) diff --git a/pegleg/engine/generators/passphrase_generator.py b/pegleg/engine/generators/passphrase_generator.py index 77b28f42..21fbe12d 100644 --- a/pegleg/engine/generators/passphrase_generator.py +++ b/pegleg/engine/generators/passphrase_generator.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import base64 from getpass import getpass import logging @@ -69,6 +70,11 @@ class PassphraseGenerator(BaseGenerator): if not passphrase: passphrase = self._pass_util.get_crypto_string( self._catalog.get_length(p_name)) + encoding_method = self._catalog.get_encoding_method(p_name) + if encoding_method == 'base64': + # Convert string to bytes, then encode in base64 + passphrase = passphrase.encode() + passphrase = base64.b64encode(passphrase) docs = list() storage_policy = self._catalog.get_storage_policy(p_name) docs.append(self.generate_doc( diff --git a/pegleg/engine/util/pegleg_secret_management.py b/pegleg/engine/util/pegleg_secret_management.py index d832a2b6..f2e2671d 100644 --- a/pegleg/engine/util/pegleg_secret_management.py +++ b/pegleg/engine/util/pegleg_secret_management.py @@ -163,9 +163,11 @@ class PeglegSecretManagement(object): # policies doc_list.append(doc.embedded_document) continue - + secret_doc = doc.get_secret() + if type(secret_doc) != bytes: + secret_doc = secret_doc.encode() doc.set_secret( - encrypt(doc.get_secret().encode(), self.passphrase, self.salt)) + encrypt(secret_doc, self.passphrase, self.salt)) doc.set_encrypted(self._author) encrypted_docs = True doc_list.append(doc.pegleg_document) diff --git a/tests/unit/engine/test_generate_passphrases.py b/tests/unit/engine/test_generate_passphrases.py index 85a72dbd..5d11b65c 100644 --- a/tests/unit/engine/test_generate_passphrases.py +++ b/tests/unit/engine/test_generate_passphrases.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import base64 import os import tempfile import uuid @@ -23,6 +24,7 @@ from testfixtures import log_capture import yaml from pegleg.engine.generators.passphrase_generator import PassphraseGenerator +from pegleg.engine.util.cryptostring import CryptoString from pegleg.engine.util import encryption from pegleg.engine import util import pegleg @@ -85,6 +87,32 @@ data: ... """) +TEST_BASE64_PASSPHRASES_CATALOG = yaml.load(""" +--- +schema: pegleg/PassphraseCatalog/v1 +metadata: + schema: metadata/Document/v1 + name: cluster-passphrases + layeringDefinition: + abstract: false + layer: global + storagePolicy: cleartext +data: + passphrases: + - description: 'description of base64 required passphrases' + document_name: base64_encoded_passphrase_doc + encrypted: true + encoding: base64 + - description: 'description of not base64 encoded passphrases' + document_name: not_encoded + encrypted: true + encoding: none + - description: 'description of not base64 encoded passphrases' + document_name: also_not_encoded + encrypted: true +... +""") + TEST_REPOSITORIES = { 'repositories': { 'global': { @@ -118,6 +146,7 @@ TEST_SITE_DEFINITION = { TEST_SITE_DOCUMENTS = [TEST_SITE_DEFINITION, TEST_PASSPHRASES_CATALOG] TEST_GLOBAL_SITE_DOCUMENTS = [TEST_SITE_DEFINITION, TEST_GLOBAL_PASSPHRASES_CATALOG] +TEST_BASE64_SITE_DOCUMENTS = [TEST_SITE_DEFINITION, TEST_BASE64_PASSPHRASES_CATALOG] @mock.patch.object( @@ -240,3 +269,66 @@ def test_global_passphrase_catalog(*_): os.environ['PEGLEG_SALT'].encode()) if passphrase_file_name == "passphrase_from_global.yaml": assert len(decrypted_passphrase) == 24 + + +@mock.patch.object( + util.definition, + 'documents_for_site', + autospec=True, + return_value=TEST_BASE64_SITE_DOCUMENTS) +@mock.patch.object( + pegleg.config, + 'get_site_repo', + autospec=True, + return_value='cicd_site_repo') +@mock.patch.object( + util.definition, + 'site_files', + autospec=True, + return_value=[ + 'cicd_global_repo/site/cicd/passphrases/passphrase-catalog.yaml', ]) +@mock.patch.dict(os.environ, { + ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC', + ENV_SALT: 'MySecretSalt1234567890]['}) +def test_base64_passphrase_catalog(*_): + _dir = tempfile.mkdtemp() + os.makedirs(os.path.join(_dir, 'cicd_site_repo'), exist_ok=True) + PassphraseGenerator('cicd', _dir, 'test_author').generate() + + for passphrase in TEST_BASE64_PASSPHRASES_CATALOG['data']['passphrases']: + passphrase_file_name = '{}.yaml'.format(passphrase['document_name']) + passphrase_file_path = os.path.join(_dir, 'site', 'cicd', 'secrets', + 'passphrases', + passphrase_file_name) + assert os.path.isfile(passphrase_file_path) + with open(passphrase_file_path) as stream: + doc = yaml.safe_load(stream) + decrypted_passphrase = encryption.decrypt( + doc['data']['managedDocument']['data'], + os.environ['PEGLEG_PASSPHRASE'].encode(), + os.environ['PEGLEG_SALT'].encode()) + if passphrase_file_name == "base64_encoded_passphrase_doc.yaml": + assert decrypted_passphrase == base64.b64encode( + base64.b64decode(decrypted_passphrase)) + + +@mock.patch.dict(os.environ, { + ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC', + ENV_SALT: 'MySecretSalt1234567890]['}) +def test_crypt_coding_flow(): + cs_util = CryptoString() + orig_passphrase = cs_util.get_crypto_string() + bytes_passphrase = orig_passphrase.encode() + b64_passphrase = base64.b64encode(bytes_passphrase) + encrypted = encryption.encrypt(b64_passphrase, + os.environ['PEGLEG_PASSPHRASE'].encode(), + os.environ['PEGLEG_SALT'].encode() + ) + decrypted = encryption.decrypt(encrypted, + os.environ['PEGLEG_PASSPHRASE'].encode(), + os.environ['PEGLEG_SALT'].encode() + ) + assert encrypted != decrypted + assert decrypted == b64_passphrase + assert base64.b64decode(decrypted) == bytes_passphrase + assert bytes_passphrase.decode() == orig_passphrase \ No newline at end of file