Merge "Pegleg encryption of site secrets"

This commit is contained in:
Zuul 2018-10-30 21:18:39 +00:00 committed by Gerrit Code Review
commit 4a352510d2
14 changed files with 835 additions and 9 deletions

View File

@ -391,6 +391,105 @@ A more complex example involves excluding certain linting checks:
.. _command-line-repository-overrides:
Secrets
-------
A sub-group of site command group, which allows you to perform secrets
level operations for secrets documents of a site.
::
./pegleg.sh site -r <site_repo> -e <extra_repo> secrets <command> <options>
Encrypt
^^^^^^^
Encrypt one site's secrets documents, which have the
metadata.storagePolicy set to encrypted, and wrap them in `pegleg managed
documents <https://airship-specs.readthedocs.io/en/latest/specs/approved/pegleg-secrets.html#peglegmanageddocument>`_.
**Note**: The encrypt command is idempotent. If the command is executed more
than once for a given site, it will skip the files, which are already
encrypted and wrapped in a pegleg managed document, and will only encrypt the
documents not encrypted before.
**site_name** (Required).
Name of the site.
**-a / --author** (Required)
Identifier for the program or person who is encrypting the secrets documents.
**-s / --save-location** (Optional).
Where to output encrypted and wrapped documents. If omitted, the results
will overwrite the original documents.
Usage:
::
./pegleg.sh site <options> secrets encrypt <site_name> -a <author_id> -s <save_location>
Examples
""""""""
Example with optional save location:
::
./pegleg.sh site -r /opt/site-manifests \
-e global=/opt/manifests \
-e secrets=/opt/security-manifests \
secrets encrypt <site_name> -a <author_id> -s /workspace
Example without optional save location:
::
./pegleg.sh site -r /opt/site-manifests \
-e global=/opt/manifests \
-e secrets=/opt/security-manifests \
secrets encrypt <site_name> -a <author_id>
Decrypt
^^^^^^^
Unwrap an encrypted secrets document from a `pegleg managed
document <https://airship-specs.readthedocs.io/en/latest/specs/approved/pegleg-secrets.html#peglegmanageddocument>`_,
decrypt the encrypted secrets, and dump the cleartext secrets file to
``stdout``.
**site_name** (Required).
Name of the site.
**-f / filename** (Required).
The absolute path to the pegleg managed encrypted secrets file.
Usage:
::
./pegleg.sh site <options> secrets decrypt <site_name> -f <file_path>
Examples
""""""""
Example:
::
./pegleg.sh site -r /opt/site-manifests \
-e global=/opt/manifests \
-e secrets=/opt/security-manifests \
secrets decrypt site1 -f \
/opt/security-manifests/site/site1/passwords/password1.yaml
CLI Repository Overrides
------------------------

Binary file not shown.

Before

Width:  |  Height:  |  Size: 37 KiB

After

Width:  |  Height:  |  Size: 37 KiB

View File

@ -358,3 +358,50 @@ def list_types(*, output_stream):
"""List type names for a given repository."""
engine.repository.process_site_repository(update_config=True)
engine.type.list_types(output_stream)
@site.group(name='secrets', help='Commands to manage site secrets documents')
def secrets():
pass
@secrets.command(
'encrypt',
help='Command to encrypt and wrap site secrets '
'documents with metadata.storagePolicy set '
'to encrypted, in pegleg managed documents.')
@click.option(
'-s',
'--save-location',
'save_location',
default=None,
help='Directory to output the encrypted site secrets files. Created '
'automatically if it does not already exist. '
'If save_location is not provided, the output encrypted files will '
'overwrite the original input files (default behavior)')
@click.option(
'-a',
'--author',
'author',
required=True,
help='Identifier for the program or person who is encrypting the secrets '
'documents')
@click.argument('site_name')
def encrypt(*, save_location, author, site_name):
engine.repository.process_repositories(site_name)
engine.secrets.encrypt(save_location, author, site_name)
@secrets.command(
'decrypt',
help='Command to unwrap and decrypt one site '
'secrets document and print it to stdout.')
@click.option(
'-f',
'--filename',
'file_name',
help='The file name to decrypt and print out to stdout')
@click.argument('site_name')
def decrypt(*, file_name, site_name):
engine.repository.process_repositories(site_name)
engine.secrets.decrypt(file_name, site_name)

View File

@ -19,6 +19,7 @@ from pegleg.engine import lint
from pegleg.engine import repository
from pegleg.engine import site
from pegleg.engine import type
from pegleg.engine import secrets
def __represent_multiline_yaml_str():

113
pegleg/engine/secrets.py Normal file
View File

@ -0,0 +1,113 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from pegleg.engine.util.pegleg_secret_management import PeglegSecretManagement
from pegleg.engine.util import files
from pegleg.engine.util import definition
__all__ = ('encrypt', 'decrypt')
LOG = logging.getLogger(__name__)
def encrypt(save_location, author, site_name):
"""
Encrypt all secrets documents for a site identifies by site_name.
Parse through all documents related to site_name and encrypt all
site documents which have metadata.storagePolicy: encrypted, and which are
not already encrypted and wrapped in a PeglegManagedDocument.
Passphrase and salt for the encryption are read from environment
variables ($PEGLEG_PASSPHRASE and $PEGLEG_SALT respectively).
By default, the resulting output files will overwrite the original
unencrypted secrets documents.
:param save_location: if provided, identifies the base directory to store
the encrypted secrets files. If not provided the encrypted secrets files
will overwrite the original unencrypted files (default behavior).
:type save_location: string
:param author: The identifier provided by the application or
the person who requests encrypt the site secrets documents.
:type author: string
:param site_name: The name of the site to encrypt its secrets files.
:type site_name: string
"""
files.check_file_save_location(save_location)
LOG.info('Started encrypting...')
secrets_found = False
for repo_base, file_path in definition.site_files_by_repo(site_name):
secrets_found = True
PeglegSecretManagement(file_path).encrypt_secrets(
_get_dest_path(repo_base, file_path, save_location), author)
if secrets_found:
LOG.info('Encryption of all secret files was completed.')
else:
LOG.warn(
'No secret documents were found for site: {}'.format(site_name))
def decrypt(file_path, site_name):
"""
Decrypt one secrets file and print the decrypted data to standard out.
Search in in secrets file of a site, identified by site_name, for a file
named file_name.
If the file is found and encrypted, unwrap and decrypt it and print the
result to standard out.
If the file is found, but it is not encrypted, print the contents of the
file to standard out.
Passphrase and salt for the decryption are read from environment variables.
:param file_path: Path to the file to be unwrapped and decrypted.
:type file_path: string
:param site_name: The name of the site to search for the file.
:type site_name: string providing the site name
"""
LOG.info('Started decrypting...')
if os.path.isfile(file_path) \
and [s for s in file_path.split(os.path.sep) if s == site_name]:
PeglegSecretManagement(file_path).decrypt_secrets()
else:
LOG.info('File: {} was not found. Check your file path and name, '
'and try again.'.format(file_path))
def _get_dest_path(repo_base, file_path, save_location):
"""
Calculate and return the destination base directory path for the
encrypted or decrypted secrets files.
:param repo_base: Base repo of the source secrets file.
:type repo_base: string
:param file_path: File path to the source secrets file.
:type file_path: string
:param save_location: Base location of destination secrets file
:type save_location: string
:return: The file path of the destination secrets file.
:rtype: string
"""
if save_location \
and save_location != os.path.sep \
and save_location.endswith(os.path.sep):
save_location = save_location.rstrip(os.path.sep)
if repo_base and repo_base.endswith(os.path.sep):
repo_base = repo_base.rstrip(os.path.sep)
if save_location:
return file_path.replace(repo_base, save_location)
else:
return file_path

View File

@ -21,6 +21,7 @@ import yaml
from prettytable import PrettyTable
from pegleg.engine import util
from pegleg.engine.util import files
__all__ = ('collect', 'list_', 'show', 'render')
@ -55,14 +56,8 @@ def _collect_to_file(site_name, save_location):
"""Collects all documents related to ``site_name`` and outputs them to
the file denoted by ``save_location``.
"""
if not os.path.exists(save_location):
LOG.debug("Collection save location %s does not exist. Creating "
"automatically.", save_location)
os.makedirs(save_location)
# In case save_location already exists and isn't a directory.
if not os.path.isdir(save_location):
raise click.ClickException('save_location %s already exists, but must '
'be a directory' % save_location)
files.check_file_save_location(save_location)
save_files = dict()
try:

View File

@ -0,0 +1,129 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import base64
from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.exceptions import InvalidSignature
KEY_LENGTH = 32
ITERATIONS = 10000
LOG = logging.getLogger(__name__)
def encrypt(unencrypted_data,
passphrase,
salt,
key_length=KEY_LENGTH,
iterations=ITERATIONS):
"""
Encrypt the data, using the provided passphrase and salt,
and return the encrypted data.
:param unencrypted_data: Secret data to encrypt
:type unencrypted_data: bytes
:param passphrase: Passphrase to use to generate encryption key. Must be
at least 24-byte long
:type passphrase: bytes
:param salt: salt to use to generate encryption key. Must be randomly
generated.
:type salt: bytes
:param key_length: Length of the encryption key to generate, in bytes.
Will default to 32, if not provided.
:type key_length: positive integer.
:param iterations: A large number, used as seed to increase the entropy
in randomness of the generated key for encryption, and hence greatly
increase the security of encrypted data. will default to 10000, if not
provided.
:type iterations: positive integer.
:return: Encrypted secret data
:rtype: bytes
"""
return Fernet(_generate_key(passphrase, salt, key_length,
iterations)).encrypt(unencrypted_data)
def decrypt(encrypted_data,
passphrase,
salt,
key_length=KEY_LENGTH,
iterations=ITERATIONS):
"""
Decrypt the data, using the provided passphrase and salt,
and return the decrypted data.
:param encrypted_data: Encrypted secret data
:type encrypted_data: bytes
:param passphrase: Passphrase to use to generate decryption key. Must be
at least 32-byte long.
:type passphrase: bytes
:param salt: salt to use to generate decryption key. Must be randomly
generated.
:type salt: bytes
:param key_length: Length of the decryption key to generate, in bytes.
will default to 32, if not provided.
:type key_length: positive integer.
:param iterations: A large number, used as seed to increase entropy in
the randomness of the generated key for decryption, and hence greatly
increase the security of encrypted data. Will default to 10000, if not
provided.
:type iterations: positive integer.
:return: Decrypted secret data
:rtype: bytes
:raises InvalidSignature: If the provided passphrase, and/or
salt does not match the values used to encrypt the data.
"""
try:
return Fernet(_generate_key(passphrase, salt, key_length,
iterations)).decrypt(encrypted_data)
except InvalidSignature:
LOG.error('Signature verification to decrypt secrets failed. Please '
'check your provided passphrase and salt and try again.')
raise
def _generate_key(passphrase, salt, key_length, iterations):
"""
Use the passphrase and salt and PBKDF2HMAC key derivation algorithm,
to generate and and return a Fernet key to be used for encryption and
decryption of secret data.
:param passphrase: Passphrase to use to generate decryption key. Must be
at least 24-byte long.
:type passphrase: bytes
:param salt: salt to use to generate decryption key. Must be randomly
generated.
:type salt: bytes
:param key_length: Length of the decryption key to generate, in bytes.
Will default to 32, if not provided.
:type key_length: positive integer.
:param iterations: A large number, used as seed to increase the entropy
of the randomness of the generated key. will default to 10000, if not
provided.
:type iterations: positive integer.
:return: base64 encoded, URL safe Fernet key for encryption or decryption
"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=key_length,
salt=salt,
iterations=iterations,
backend=default_backend())
return base64.urlsafe_b64encode(kdf.derive(passphrase))

View File

@ -29,9 +29,12 @@ __all__ = [
'directories_for',
'directory_for',
'dump',
'read',
'write',
'existing_directories',
'search',
'slurp',
'check_file_save_location',
]
DIR_DEPTHS = {
@ -234,6 +237,48 @@ def dump(path, data):
yaml.dump(data, f, explicit_start=True)
def read(path):
"""
Read the yaml file ``path`` and return its contents as a list of
dicts
"""
if not os.path.exists(path):
raise click.ClickException(
'{} not found. Pegleg must be run from the root of a '
'configuration repository.'.format(path))
with open(path) as stream:
try:
return list(yaml.safe_load_all(stream))
except yaml.YAMLError as e:
raise click.ClickException('Failed to parse %s:\n%s' % (path, e))
def write(file_path, data):
"""
Write the data to destination file_path.
If the directory structure of the file_path should not exist, create it.
If the file should exit, overwrite it with new data,
:param file_path: Destination file for the written data file
:type file_path: str
:param data: data to be written to the destination file
:type data: dict or a list of dicts
"""
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'w') as stream:
yaml.safe_dump_all(
data,
stream,
explicit_start=True,
explicit_end=True,
default_flow_style=False)
def _recurse_subdirs(search_path, depth):
directories = set()
try:
@ -257,3 +302,25 @@ def search(search_paths):
for filename in filenames:
if filename.endswith(".yaml"):
yield os.path.join(root, filename)
def check_file_save_location(save_location):
"""
Verify exists and is a valid directory. If it does not exist create it.
:param save_location: Base directory to save the result of the
encryption or decryption of site secrets.
:type save_location: string, directory path
:raises click.ClickException: If pre-flight check should fail.
"""
if save_location:
if not os.path.exists(save_location):
LOG.debug("Save location %s does not exist. Creating "
"automatically.", save_location)
os.makedirs(save_location)
# In case save_location already exists and isn't a directory.
if not os.path.isdir(save_location):
raise click.ClickException(
'save_location %s already exists, '
'but is not a directory'.format(save_location))

View File

@ -0,0 +1,141 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from datetime import datetime
PEGLEG_MANAGED_SCHEMA = 'pegleg/PeglegManagedDocument/v1'
ENCRYPTED = 'encrypted'
STORAGE_POLICY = 'storagePolicy'
METADATA = 'metadata'
LOG = logging.getLogger(__name__)
class PeglegManagedSecretsDocument():
"""Object representing one Pegleg managed secret document."""
def __init__(self, secrets_document):
"""
Parse and wrap an externally generated document in a
pegleg managed document.
:param secrets_document: The content of the source document
:type secrets_document: dict
"""
if self.is_pegleg_managed_secret(secrets_document):
self._pegleg_document = secrets_document
else:
self._pegleg_document =\
self.__wrap(secrets_document)
self._embedded_document = \
self._pegleg_document['data']['managedDocument']
@staticmethod
def __wrap(secrets_document):
"""
Embeds a valid deckhand document in a pegleg managed document.
:param secrets_document: secrets document to be embedded in a
pegleg managed document.
:type secrets_document: dict
:return: pegleg manged document with the wrapped original secrets
document.
:rtype: dict
"""
return {
'schema': PEGLEG_MANAGED_SCHEMA,
'metadata': {
'name': secrets_document['metadata']['name'],
'schema': 'deckhand/Document/v1',
'labels': secrets_document['metadata'].get('labels', {}),
'layeringDefinition': {
'abstract': False,
# The current requirement only requires site layer.
'layer': 'site',
},
'storagePolicy': 'cleartext'
},
'data': {
'managedDocument': {
'schema': secrets_document['schema'],
'metadata': secrets_document['metadata'],
'data': secrets_document['data']
}
}
}
@staticmethod
def is_pegleg_managed_secret(secrets_document):
""""
Verify if the document is already a pegleg managed secrets document.
:return: True if the document is a pegleg managed secrets document,
False otherwise.
:rtype: bool
"""
return PEGLEG_MANAGED_SCHEMA in secrets_document.get('schema')
@property
def embedded_document(self):
"""
parse the pegleg managed document, and return the embedded document
:return: The original secrets document unwrapped from the pegleg
managed document.
:rtype: dict
"""
return self._embedded_document
@property
def name(self):
return self._pegleg_document.get('metadata', {}).get('name')
@property
def data(self):
return self._pegleg_document.get('data')
@property
def pegleg_document(self):
return self._pegleg_document
def is_encrypted(self):
"""If the document is already encrypted return True. False
otherwise."""
return ENCRYPTED in self.data
def is_storage_policy_encrypted(self):
"""If the document's storagePolicy is set to encrypted return True.
False otherwise."""
return STORAGE_POLICY in self._embedded_document[METADATA] \
and ENCRYPTED in self._embedded_document[METADATA][STORAGE_POLICY]
def set_encrypted(self, author):
"""Mark the pegleg managed document as encrypted."""
self.data[ENCRYPTED] = {
'at': datetime.utcnow().isoformat(),
'by': author,
}
def set_decrypted(self):
"""Mark the pegleg managed document as un-encrypted."""
self.data.pop(ENCRYPTED)
def set_secret(self, secret):
self._embedded_document['data'] = secret
def get_secret(self):
return self._embedded_document.get('data')

View File

@ -0,0 +1,137 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import yaml
import sys
import re
import click
from pegleg.engine.util.encryption import encrypt
from pegleg.engine.util.encryption import decrypt
from pegleg.engine.util.pegleg_managed_document import \
PeglegManagedSecretsDocument as PeglegManagedSecret
from pegleg.engine.util import files
LOG = logging.getLogger(__name__)
PASSPHRASE_PATTERN = '^.{24,}$'
ENV_PASSPHRASE = 'PEGLEG_PASSPHRASE'
ENV_SALT = 'PEGLEG_SALT'
class PeglegSecretManagement():
"""An object to handle operations on of a pegleg managed file."""
def __init__(self, file_path):
"""
Read the source file and the environment data needed to wrap and
process the file documents as pegleg managed document.
"""
self.__check_environment()
self.file_path = file_path
self.documents = list()
for doc in files.read(file_path):
self.documents.append(PeglegManagedSecret(doc))
self.passphrase = os.environ.get(ENV_PASSPHRASE).encode()
self.salt = os.environ.get(ENV_SALT).encode()
@staticmethod
def __check_environment():
"""
Validate required environment variables for encryption or decryption.
:return None
:raises click.ClickException: If environment validation should fail.
"""
# Verify that passphrase environment variable is defined and is longer
# than 24 characters.
if not os.environ.get(ENV_PASSPHRASE) or not re.match(
PASSPHRASE_PATTERN, os.environ.get(ENV_PASSPHRASE)):
raise click.ClickException(
'Environment variable {} is not defined or '
'is not at least 24-character long.'.format(ENV_PASSPHRASE))
if not os.environ.get(ENV_SALT):
raise click.ClickException(
'Environment variable {} is not defined or '
'is an empty string.'.format(ENV_SALT))
def encrypt_secrets(self, save_path, author):
"""
Wrap and encrypt the secrets documents included in the input file,
into pegleg manage secrets documents, and write the result in
save_path.
if save_path is the same as the source file_path the encrypted file
will overwrite the source file.
:param save_path: Destination path of the encrypted file
:type save_path: string
:param author: Identifier for the program or person who is
encrypting the secrets documents
:type author: string
"""
encrypted_docs = False
doc_list = []
for doc in self.documents:
# do not re-encrypt already encrypted data
if doc.is_encrypted():
continue
# only encrypt if storagePolicy is set to encrypted.
if not doc.is_storage_policy_encrypted():
# case documents in a file have different storage
# policies
doc_list.append(doc.embedded_document)
continue
doc.set_secret(
encrypt(doc.get_secret().encode(), self.passphrase, self.salt))
doc.set_encrypted(author)
encrypted_docs = True
doc_list.append(doc.pegleg_document)
if encrypted_docs:
files.write(save_path, doc_list)
LOG.info('Wrote data to: {}.'.format(save_path))
else:
LOG.debug('All documents in file: {} are either already encrypted '
'or have cleartext storage policy. '
'Skipping.'.format(self.file_path))
def decrypt_secrets(self):
"""Decrypt and unwrap pegleg managed encrypted secrets documents
included in a site secrets file, and print the result to the standard
out."""
doc_list = []
for doc in self.documents:
# only decrypt an encrypted document
if doc.is_encrypted():
doc.set_secret(
decrypt(doc.get_secret(),
self.passphrase,
self.salt).decode())
doc.set_decrypted()
doc_list.append(doc.embedded_document)
yaml.safe_dump_all(
doc_list,
sys.stdout,
explicit_start=True,
explicit_end=True,
default_flow_style=False)

View File

@ -2,4 +2,5 @@ gitpython
click==6.7
jsonschema==2.6.0
pyyaml==3.12
cryptography==2.3.1
git+https://github.com/openstack/airship-deckhand.git@7d697012fcbd868b14670aa9cf895acfad5a7f8d

View File

@ -0,0 +1,94 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import click
import os
import tempfile
import mock
import pytest
import yaml
from pegleg.engine.util import encryption as crypt
from tests.unit import test_utils
from pegleg.engine import secrets
from pegleg.engine.util.pegleg_managed_document import \
PeglegManagedSecretsDocument
from pegleg.engine.util.pegleg_secret_management import PeglegSecretManagement
from pegleg.engine.util.pegleg_secret_management import ENV_PASSPHRASE
from pegleg.engine.util.pegleg_secret_management import ENV_SALT
TEST_DATA = """
---
schema: deckhand/Passphrase/v1
metadata:
schema: metadata/Document/v1
name: osh_addons_keystone_ranger-agent_password
layeringDefinition:
abstract: false
layer: site
storagePolicy: encrypted
data: 512363f37eab654313991174aef9f867d
...
"""
def test_encrypt_and_decrypt():
data = test_utils.rand_name("this is an example of un-encrypted "
"data.", "pegleg").encode()
passphrase = test_utils.rand_name("passphrase1", "pegleg").encode()
salt = test_utils.rand_name("salt1", "pegleg").encode()
enc1 = crypt.encrypt(data, passphrase, salt)
dec1 = crypt.decrypt(enc1, passphrase, salt)
assert data == dec1
enc2 = crypt.encrypt(dec1, passphrase, salt)
dec2 = crypt.decrypt(enc2, passphrase, salt)
assert data == dec2
@mock.patch.dict(os.environ, {ENV_PASSPHRASE:'aShortPassphrase',
ENV_SALT: 'MySecretSalt'})
def test_short_passphrase():
with pytest.raises(click.ClickException,
match=r'.*is not at least 24-character long.*'):
PeglegSecretManagement('file_path')
def test_PeglegManagedDocument():
test_data = yaml.load(TEST_DATA)
doc = PeglegManagedSecretsDocument(test_data)
assert doc.is_storage_policy_encrypted() is True
assert doc.is_encrypted() is False
@mock.patch.dict(os.environ, {ENV_PASSPHRASE:'ytrr89erARAiPE34692iwUMvWqqBvC',
ENV_SALT: 'MySecretSalt'})
def test_encrypt_document():
# write the test data to temp file
test_data = yaml.load(TEST_DATA)
dir = tempfile.mkdtemp()
file_path = os.path.join(dir, 'secrets_file.yaml')
save_path = os.path.join(dir, 'encrypted_secrets_file.yaml')
with open(file_path, 'w') as stream:
yaml.dump(test_data,
stream,
explicit_start=True,
explicit_end=True,
default_flow_style=False)
# read back the secrets data file and encrypt it
doc_mgr = PeglegSecretManagement(file_path)
doc_mgr.encrypt_secrets(save_path, 'test_author')
doc = doc_mgr.documents[0]
assert doc.is_encrypted()
assert doc.data['encrypted']['by'] == 'test_author'

View File

@ -20,5 +20,7 @@ docker run --rm $TERM_OPTS \
--workdir="$container_workspace_path" \
-v "${HOME}/.ssh:${container_workspace_path}/.ssh" \
-v "${WORKSPACE}:$container_workspace_path" \
-e "PEGLEG_PASSPHRASE=$PEGLEG_PASSPHRASE" \
-e "PEGLEG_SALT=$PEGLEG_SALT" \
"${IMAGE}" \
pegleg "${@}"

View File

@ -9,7 +9,7 @@ skipsdist = True
setenv = VIRTUAL_ENV={envdir}
LANGUAGE=en_US
LC_ALL=en_US.utf-8
passenv = http_proxy HTTP_PROXY https_proxy HTTPS_PROXY no_proxy NO_PROXY
passenv = http_proxy https_proxy HTTP_PROXY HTTPS_PROXY
deps =
-r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt