From 5a58ba807ab0c5d316ee09a9c206d988c899ae69 Mon Sep 17 00:00:00 2001 From: Alexander Hughes Date: Fri, 10 May 2019 09:30:53 -0500 Subject: [PATCH] Support b64 encoding of passphrase catalog Some applications, such as k8s, require a base64 encoded string. This patch updates the passphrase catalog such that the user can specify to use base64 encoding on one or more passphrases found in the passphrase catalog. We add that support by: 1. Updating pegleg.engine.catalogs.passphrase_catalog to include a method which determines what encoding type to use, if any. 2. Updating pegleg.engine.generators.passphrase_generator.generate to encode the passphrase in base64 if detected. This change is designed to easily add other supported encoding methods in the future if desired. 3. Updating tests.unit.engine.test_generate_passphrases to demonstrate that the encoding field in passphrase catalog is being used, and that the resultant passphrase is in fact base64 encoded. Also show that when encoding type is not specified, or is set to 'none' that base64 encoding does not take place. 4. Updating tests.unit.engine.test_generate_passphrases to demonstrate that the encoding field in passphrase catalog is being used, and that the resultant passphrase is in fact base64 encoded. We also demonstrate the flow from original passphrase to bytes, to base64 encoded, to encrypted, and back again yields the expected values at each step of encoding/decoding/encryption and decryption. Change-Id: I47c740ca13be57ed74b6780f80c90b39e935708b --- pegleg/engine/catalogs/passphrase_catalog.py | 17 ++++ .../engine/generators/passphrase_generator.py | 6 ++ .../engine/util/pegleg_secret_management.py | 6 +- .../unit/engine/test_generate_passphrases.py | 92 +++++++++++++++++++ 4 files changed, 119 insertions(+), 2 deletions(-) diff --git a/pegleg/engine/catalogs/passphrase_catalog.py b/pegleg/engine/catalogs/passphrase_catalog.py index 0e1e4725..44129352 100644 --- a/pegleg/engine/catalogs/passphrase_catalog.py +++ b/pegleg/engine/catalogs/passphrase_catalog.py @@ -24,8 +24,10 @@ P_LENGTH = 'length' P_DESCRIPTION = 'description' P_ENCRYPTED = 'encrypted' P_CLEARTEXT = 'cleartext' +P_ENCODING = 'encoding' P_DEFAULT_LENGTH = 24 P_DEFAULT_STORAGE_POLICY = 'encrypted' +P_DEFAULT_ENCODING = 'none' __all__ = ['PassphraseCatalog'] @@ -86,3 +88,18 @@ class PassphraseCatalog(BaseCatalog): return P_CLEARTEXT else: return P_DEFAULT_STORAGE_POLICY + + def get_encoding_method(self, passphrase_name): + """Return the encoding method of the ``passphrase_name``. + + If the catalog does not specify an encoding method for the + ``passphrase_name``, return the default encoding method, 'none'. + :param str passphrase_name: The name of the passphrase to evaluate. + :returns: The encoding method to be used for ``passphrase_name``. + :rtype: str + """ + + for c_doc in self._catalog_docs: + for passphrase in c_doc['data']['passphrases']: + if passphrase[P_DOCUMENT_NAME] == passphrase_name: + return passphrase.get(P_ENCODING, P_DEFAULT_ENCODING) diff --git a/pegleg/engine/generators/passphrase_generator.py b/pegleg/engine/generators/passphrase_generator.py index 77b28f42..21fbe12d 100644 --- a/pegleg/engine/generators/passphrase_generator.py +++ b/pegleg/engine/generators/passphrase_generator.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import base64 from getpass import getpass import logging @@ -69,6 +70,11 @@ class PassphraseGenerator(BaseGenerator): if not passphrase: passphrase = self._pass_util.get_crypto_string( self._catalog.get_length(p_name)) + encoding_method = self._catalog.get_encoding_method(p_name) + if encoding_method == 'base64': + # Convert string to bytes, then encode in base64 + passphrase = passphrase.encode() + passphrase = base64.b64encode(passphrase) docs = list() storage_policy = self._catalog.get_storage_policy(p_name) docs.append(self.generate_doc( diff --git a/pegleg/engine/util/pegleg_secret_management.py b/pegleg/engine/util/pegleg_secret_management.py index d832a2b6..f2e2671d 100644 --- a/pegleg/engine/util/pegleg_secret_management.py +++ b/pegleg/engine/util/pegleg_secret_management.py @@ -163,9 +163,11 @@ class PeglegSecretManagement(object): # policies doc_list.append(doc.embedded_document) continue - + secret_doc = doc.get_secret() + if type(secret_doc) != bytes: + secret_doc = secret_doc.encode() doc.set_secret( - encrypt(doc.get_secret().encode(), self.passphrase, self.salt)) + encrypt(secret_doc, self.passphrase, self.salt)) doc.set_encrypted(self._author) encrypted_docs = True doc_list.append(doc.pegleg_document) diff --git a/tests/unit/engine/test_generate_passphrases.py b/tests/unit/engine/test_generate_passphrases.py index 85a72dbd..5d11b65c 100644 --- a/tests/unit/engine/test_generate_passphrases.py +++ b/tests/unit/engine/test_generate_passphrases.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import base64 import os import tempfile import uuid @@ -23,6 +24,7 @@ from testfixtures import log_capture import yaml from pegleg.engine.generators.passphrase_generator import PassphraseGenerator +from pegleg.engine.util.cryptostring import CryptoString from pegleg.engine.util import encryption from pegleg.engine import util import pegleg @@ -85,6 +87,32 @@ data: ... """) +TEST_BASE64_PASSPHRASES_CATALOG = yaml.load(""" +--- +schema: pegleg/PassphraseCatalog/v1 +metadata: + schema: metadata/Document/v1 + name: cluster-passphrases + layeringDefinition: + abstract: false + layer: global + storagePolicy: cleartext +data: + passphrases: + - description: 'description of base64 required passphrases' + document_name: base64_encoded_passphrase_doc + encrypted: true + encoding: base64 + - description: 'description of not base64 encoded passphrases' + document_name: not_encoded + encrypted: true + encoding: none + - description: 'description of not base64 encoded passphrases' + document_name: also_not_encoded + encrypted: true +... +""") + TEST_REPOSITORIES = { 'repositories': { 'global': { @@ -118,6 +146,7 @@ TEST_SITE_DEFINITION = { TEST_SITE_DOCUMENTS = [TEST_SITE_DEFINITION, TEST_PASSPHRASES_CATALOG] TEST_GLOBAL_SITE_DOCUMENTS = [TEST_SITE_DEFINITION, TEST_GLOBAL_PASSPHRASES_CATALOG] +TEST_BASE64_SITE_DOCUMENTS = [TEST_SITE_DEFINITION, TEST_BASE64_PASSPHRASES_CATALOG] @mock.patch.object( @@ -240,3 +269,66 @@ def test_global_passphrase_catalog(*_): os.environ['PEGLEG_SALT'].encode()) if passphrase_file_name == "passphrase_from_global.yaml": assert len(decrypted_passphrase) == 24 + + +@mock.patch.object( + util.definition, + 'documents_for_site', + autospec=True, + return_value=TEST_BASE64_SITE_DOCUMENTS) +@mock.patch.object( + pegleg.config, + 'get_site_repo', + autospec=True, + return_value='cicd_site_repo') +@mock.patch.object( + util.definition, + 'site_files', + autospec=True, + return_value=[ + 'cicd_global_repo/site/cicd/passphrases/passphrase-catalog.yaml', ]) +@mock.patch.dict(os.environ, { + ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC', + ENV_SALT: 'MySecretSalt1234567890]['}) +def test_base64_passphrase_catalog(*_): + _dir = tempfile.mkdtemp() + os.makedirs(os.path.join(_dir, 'cicd_site_repo'), exist_ok=True) + PassphraseGenerator('cicd', _dir, 'test_author').generate() + + for passphrase in TEST_BASE64_PASSPHRASES_CATALOG['data']['passphrases']: + passphrase_file_name = '{}.yaml'.format(passphrase['document_name']) + passphrase_file_path = os.path.join(_dir, 'site', 'cicd', 'secrets', + 'passphrases', + passphrase_file_name) + assert os.path.isfile(passphrase_file_path) + with open(passphrase_file_path) as stream: + doc = yaml.safe_load(stream) + decrypted_passphrase = encryption.decrypt( + doc['data']['managedDocument']['data'], + os.environ['PEGLEG_PASSPHRASE'].encode(), + os.environ['PEGLEG_SALT'].encode()) + if passphrase_file_name == "base64_encoded_passphrase_doc.yaml": + assert decrypted_passphrase == base64.b64encode( + base64.b64decode(decrypted_passphrase)) + + +@mock.patch.dict(os.environ, { + ENV_PASSPHRASE: 'ytrr89erARAiPE34692iwUMvWqqBvC', + ENV_SALT: 'MySecretSalt1234567890]['}) +def test_crypt_coding_flow(): + cs_util = CryptoString() + orig_passphrase = cs_util.get_crypto_string() + bytes_passphrase = orig_passphrase.encode() + b64_passphrase = base64.b64encode(bytes_passphrase) + encrypted = encryption.encrypt(b64_passphrase, + os.environ['PEGLEG_PASSPHRASE'].encode(), + os.environ['PEGLEG_SALT'].encode() + ) + decrypted = encryption.decrypt(encrypted, + os.environ['PEGLEG_PASSPHRASE'].encode(), + os.environ['PEGLEG_SALT'].encode() + ) + assert encrypted != decrypted + assert decrypted == b64_passphrase + assert base64.b64decode(decrypted) == bytes_passphrase + assert bytes_passphrase.decode() == orig_passphrase \ No newline at end of file