Add path option to encrypt command

Adds a path option to the encrypt command to specify what directory of
file to encrypt. If path is not specified, all applicable files in the
repositories will be encrypted (this is the current behavior).

Change-Id: Idd5e063a54cf157a8ec761de85cbd67edd05364c
This commit is contained in:
Ian H Pittwood 2019-11-12 14:31:13 -06:00 committed by Ian H. Pittwood
parent 7afa31f42f
commit ff9c95f423
5 changed files with 77 additions and 25 deletions

View File

@ -642,7 +642,13 @@ repository folder structure. The ``encrypt`` command looks up the
whose ``encryptionPolicy`` is set to ``encrypted``), and encrypts the whose ``encryptionPolicy`` is set to ``encrypted``), and encrypts the
documents in those files. documents in those files.
**-a / \\-\\-author** (Required) **-p / \\-\\-path** (Optional).
The file or directory path to encrypt. If a path is not provided, all
applicable files discovered in the user specified repositories for
``site_name`` will be encrypted.
**-a / \\-\\-author** (Required).
Author is the identifier for the program or the person, who is encrypting Author is the identifier for the program or the person, who is encrypting
the secrets documents. the secrets documents.

View File

@ -307,7 +307,7 @@ def secrets():
default=False, default=False,
show_default=True, show_default=True,
help='Force Pegleg to regenerate all PKI items.') help='Force Pegleg to regenerate all PKI items.')
@click.argument('site_name') @utils.SITE_REPOSITORY_ARGUMENT
def generate_pki_deprecated(site_name, author, days, regenerate_all): def generate_pki_deprecated(site_name, author, days, regenerate_all):
"""Generate certificates, certificate authorities and keypairs for a given """Generate certificates, certificate authorities and keypairs for a given
site. site.
@ -361,7 +361,7 @@ def generate_pki_deprecated(site_name, author, days, regenerate_all):
default=True, default=True,
show_default=True, show_default=True,
help='Whether to encrypt the wrapped file.') help='Whether to encrypt the wrapped file.')
@click.argument('site_name') @utils.SITE_REPOSITORY_ARGUMENT
def wrap_secret_cli( def wrap_secret_cli(
*, site_name, author, filename, output_path, schema, name, layer, *, site_name, author, filename, output_path, schema, name, layer,
encrypt): encrypt):
@ -401,7 +401,7 @@ def genesis_bundle(*, build_dir, validators, site_name):
'days', 'days',
default=60, default=60,
help='The number of days past today to check if certificates are valid.') help='The number of days past today to check if certificates are valid.')
@click.argument('site_name') @utils.SITE_REPOSITORY_ARGUMENT
def check_pki_certs(site_name, days): def check_pki_certs(site_name, days):
"""Check PKI certificates of a site for expiration.""" """Check PKI certificates of a site for expiration."""
expiring_certs_exist, cert_results = pegleg_main.run_check_pki_certs( expiring_certs_exist, cert_results = pegleg_main.run_check_pki_certs(
@ -496,7 +496,7 @@ def generate():
'generated, wrapped, and encrypted passphrases files will be saved ' 'generated, wrapped, and encrypted passphrases files will be saved '
'in: <save_location>/site/<site_name>/secrets/certificates/ ' 'in: <save_location>/site/<site_name>/secrets/certificates/ '
'directory. Defaults to site repository path if no value given.') 'directory. Defaults to site repository path if no value given.')
@click.argument('site_name') @utils.SITE_REPOSITORY_ARGUMENT
def generate_pki(site_name, author, days, regenerate_all, save_location): def generate_pki(site_name, author, days, regenerate_all, save_location):
"""Generate certificates, certificate authorities and keypairs for a given """Generate certificates, certificate authorities and keypairs for a given
site. site.
@ -508,7 +508,7 @@ def generate_pki(site_name, author, days, regenerate_all, save_location):
@generate.command('passphrases', help='Command to generate site passphrases') @generate.command('passphrases', help='Command to generate site passphrases')
@click.argument('site_name') @utils.SITE_REPOSITORY_ARGUMENT
@click.option( @click.option(
'-s', '-s',
'--save-location', '--save-location',
@ -562,11 +562,20 @@ def generate_passphrases(
help='Command to encrypt and wrap site secrets ' help='Command to encrypt and wrap site secrets '
'documents with metadata.storagePolicy set ' 'documents with metadata.storagePolicy set '
'to encrypted, in pegleg managed documents.') 'to encrypted, in pegleg managed documents.')
@click.option(
'-p',
'--path',
'path',
type=click.Path(exists=True, readable=True),
required=False,
help='The file or directory path to encrypt. '
'If path is not provided, all applicable files for the site '
'will be encrypted.')
@click.option( @click.option(
'-s', '-s',
'--save-location', '--save-location',
'save_location', 'save_location',
default=None, required=True,
help='Directory to output the encrypted site secrets files. Created ' help='Directory to output the encrypted site secrets files. Created '
'automatically if it does not already exist. ' 'automatically if it does not already exist. '
'If save_location is not provided, the output encrypted files will ' 'If save_location is not provided, the output encrypted files will '
@ -578,9 +587,9 @@ def generate_passphrases(
required=True, required=True,
help='Identifier for the program or person who is encrypting the secrets ' help='Identifier for the program or person who is encrypting the secrets '
'documents') 'documents')
@click.argument('site_name') @utils.SITE_REPOSITORY_ARGUMENT
def encrypt(*, save_location, author, site_name): def encrypt(*, path, save_location, author, site_name):
pegleg_main.run_encrypt(author, save_location, site_name) pegleg_main.run_encrypt(author, save_location, site_name, path=path)
@secrets.command( @secrets.command(
@ -608,7 +617,7 @@ def encrypt(*, save_location, author, site_name):
default=False, default=False,
help='Overwrites original file(s) at path with decrypted data when set. ' help='Overwrites original file(s) at path with decrypted data when set. '
'Overrides --save-location option.') 'Overrides --save-location option.')
@click.argument('site_name') @utils.SITE_REPOSITORY_ARGUMENT
def decrypt(*, path, save_location, overwrite, site_name): def decrypt(*, path, save_location, overwrite, site_name):
data = pegleg_main.run_decrypt(overwrite, path, save_location, site_name) data = pegleg_main.run_decrypt(overwrite, path, save_location, site_name)
if data: if data:

View File

@ -37,7 +37,7 @@ __all__ = ('encrypt', 'decrypt', 'generate_passphrases', 'wrap_secret')
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def encrypt(save_location, author, site_name): def encrypt(save_location, author, site_name, path=None):
""" """
Encrypt all secrets documents for a site identifies by site_name. Encrypt all secrets documents for a site identifies by site_name.
@ -56,21 +56,52 @@ def encrypt(save_location, author, site_name):
:param str author: Identifies the individual or application, who :param str author: Identifies the individual or application, who
encrypts the secrets documents. encrypts the secrets documents.
:param str site_name: The name of the site to encrypt its secrets files. :param str site_name: The name of the site to encrypt its secrets files.
:param str path: The path to the directory or file to be encrypted.
""" """
files.check_file_save_location(save_location) files.check_file_save_location(save_location)
LOG.debug('Save location is %s', save_location)
file_sets = []
path_exists = path and os.path.exists(path)
if path_exists:
if os.path.isfile(path):
LOG.debug('Specified path is a file')
file_sets = [(None, path)]
elif os.path.isdir(path):
LOG.debug('Specified path is a directory')
file_sets = []
for filename in glob(os.path.join(path, '**/*.yaml'),
recursive=True):
LOG.debug('Discovered %s', filename)
file_sets.append((None, filename))
else:
LOG.debug('No path specified, searching all repos')
file_sets = list(definition.site_files_by_repo(site_name))
LOG.info('Started encrypting...') LOG.info('Started encrypting...')
secrets_found = False secrets_found = False
for repo_base, file_path in definition.site_files_by_repo(site_name): for repo_base, file_path in file_sets:
LOG.debug('Looking at %s in %s repo', file_path, repo_base)
secrets_found = True secrets_found = True
PeglegSecretManagement( secret = PeglegSecretManagement(
file_path=file_path, author=author, file_path=file_path, author=author, site_name=site_name)
site_name=site_name).encrypt_secrets( if path_exists:
_get_dest_path(repo_base, file_path, save_location)) if save_location:
output_path = os.path.join(
save_location.rstrip(os.path.sep),
file_path.lstrip(os.path.sep))
else:
output_path = file_path
else:
output_path = _get_dest_path(repo_base, file_path, save_location)
LOG.debug('Outputting encrypted data to %s', output_path)
secret.encrypt_secrets(output_path)
if secrets_found: if secrets_found:
LOG.info('Encryption of all secret files was completed.') LOG.info('Encryption of all secret files was completed.')
else: else:
LOG.warn( LOG.warning(
'No secret documents were found for site: {}'.format(site_name)) 'No secret documents were found for site: {}'.format(site_name))

View File

@ -368,7 +368,7 @@ def run_generate_passphrases(
force_cleartext=force_cleartext) force_cleartext=force_cleartext)
def run_encrypt(author, save_location, site_name): def run_encrypt(author, save_location, site_name, path=None):
"""Wraps and encrypts site secret documents """Wraps and encrypts site secret documents
:param author: identifies author generating new certificates for :param author: identifies author generating new certificates for
@ -376,12 +376,14 @@ def run_encrypt(author, save_location, site_name):
:param save_location: path to save encrypted documents to, if None the :param save_location: path to save encrypted documents to, if None the
original documents are overwritten original documents are overwritten
:param site_name: site name to process :param site_name: site name to process
:param path: path to the document(s) to encrypt
:return: :return:
""" """
config.set_global_enc_keys(site_name) config.set_global_enc_keys(site_name)
if save_location is None: if save_location is None and path is None:
save_location = config.get_site_repo() save_location = config.get_site_repo()
engine.secrets.encrypt(save_location, author, site_name=site_name) engine.secrets.encrypt(
save_location, author, site_name=site_name, path=path)
def run_decrypt(overwrite, path, save_location, site_name): def run_decrypt(overwrite, path, save_location, site_name):

View File

@ -586,7 +586,10 @@ class TestSiteSecretsActions(BaseCLIActionTest):
with open(file_path, "w") as ceph_fsid_fi: with open(file_path, "w") as ceph_fsid_fi:
yaml.dump(ceph_fsid, ceph_fsid_fi) yaml.dump(ceph_fsid, ceph_fsid_fi)
secrets_opts = ['secrets', 'encrypt', '-a', 'test', self.site_name] secrets_opts = [
'secrets', 'encrypt', '--save-location', repo_path, '-a', 'test',
self.site_name
]
result = self.runner.invoke( result = self.runner.invoke(
commands.site, ['--no-decrypt', '-r', repo_path] + secrets_opts) commands.site, ['--no-decrypt', '-r', repo_path] + secrets_opts)
@ -955,14 +958,15 @@ class TestCliSiteSubcommandsWithDecryptOption(BaseCLIActionTest):
@pytest.mark.skipif( @pytest.mark.skipif(
not pki_utility.PKIUtility.cfssl_exists(), not pki_utility.PKIUtility.cfssl_exists(),
reason='cfssl must be installed to execute these tests') reason='cfssl must be installed to execute these tests')
def test_check_pki_certs_expired_using_decrypt_option(self): def test_check_pki_certs_expired_using_decrypt_option(self, tmpdir):
repo_path = self.treasuremap_path repo_path = self.treasuremap_path
secrets_opts = ['secrets', 'check-pki-certs', self.site_name] secrets_opts = ['secrets', 'check-pki-certs', self.site_name]
result = self.runner.invoke( result = self.runner.invoke(
commands.site, ['--decrypt', '-r', repo_path] + secrets_opts) commands.site,
['--decrypt', '-r', repo_path, '-p', tmpdir] + secrets_opts)
assert result.exit_code == 1, result.output assert result.exit_code == 1, result.output
assert self._validate_no_files_encrypted( assert self._validate_no_files_encrypted(
os.path.join(repo_path, 'site', 'seaworthy', 'secrets')) os.path.join(tmpdir, 'site', 'seaworthy', 'secrets'))
def test_genesis_bundle_using_decrypt_option(self, tmpdir): def test_genesis_bundle_using_decrypt_option(self, tmpdir):
repo_path = self.treasuremap_path repo_path = self.treasuremap_path