# Copyright 2018 AT&T Intellectual Property. All other rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from datetime import datetime import json import logging import os # Ignore bandit false positive: B404:blacklist # The purpose of this module is to safely encapsulate calls via fork. import subprocess # nosec import tempfile from dateutil import parser import pytz import yaml from pegleg.engine.util.pegleg_managed_document import \ PeglegManagedSecretsDocument LOG = logging.getLogger(__name__) _ONE_YEAR_IN_HOURS = '8760h' # 365 * 24 __all__ = ['PKIUtility'] # TODO(felipemonteiro): Create an abstract base class for other future Catalog # classes. class PKIUtility(object): """Public Key Infrastructure utility class. Responsible for generating certificate and CA documents using ``cfssl`` and keypairs using ``openssl``. These secrets are all wrapped in instances of ``pegleg/PeglegManagedDocument/v1``. """ @staticmethod def cfssl_exists(): """Checks whether cfssl command exists. Useful for testing.""" try: subprocess.check_output( # nosec ['which', 'cfssl'], stderr=subprocess.STDOUT) return True except subprocess.CalledProcessError: return False def __init__(self, *, block_strings=True): self.block_strings = block_strings self._ca_config_string = None @property def ca_config(self): if not self._ca_config_string: self._ca_config_string = json.dumps({ 'signing': { 'default': { # TODO(felipemonteiro): Make this configurable. 'expiry': _ONE_YEAR_IN_HOURS, 'usages': [ 'signing', 'key encipherment', 'server auth', 'client auth' ], }, }, }) return self._ca_config_string def generate_ca(self, ca_name): """Generate CA cert and associated key. :param str ca_name: Name of Certificate Authority in wrapped document. :returns: Tuple of (wrapped CA cert, wrapped CA key) :rtype: tuple[dict, dict] """ result = self._cfssl( ['gencert', '-initca', 'csr.json'], files={ 'csr.json': self.csr(name=ca_name), }) return (self._wrap_ca(ca_name, result['cert']), self._wrap_ca_key(ca_name, result['key'])) def generate_keypair(self, name): """Generate keypair. :param str name: Name of keypair in wrapped document. :returns: Tuple of (wrapped public key, wrapped private key) :rtype: tuple[dict, dict] """ priv_result = self._openssl(['genrsa', '-out', 'priv.pem']) pub_result = self._openssl( ['rsa', '-in', 'priv.pem', '-pubout', '-out', 'pub.pem'], files={ 'priv.pem': priv_result['priv.pem'], }) return (self._wrap_pub_key(name, pub_result['pub.pem']), self._wrap_priv_key(name, priv_result['priv.pem'])) def generate_certificate(self, name, *, ca_cert, ca_key, cn, groups=None, hosts=None): """Generate certificate and associated key given CA cert and key. :param str name: Name of certificate in wrapped document. :param str ca_cert: CA certificate. :param str ca_key: CA certificate key. :param str cn: Common name associated with certificate. :param list groups: List of groups associated with certificate. :param list hosts: List of hosts associated with certificate. :returns: Tuple of (wrapped certificate, wrapped certificate key) :rtype: tuple[dict, dict] """ if groups is None: groups = [] if hosts is None: hosts = [] result = self._cfssl( [ 'gencert', '-ca', 'ca.pem', '-ca-key', 'ca-key.pem', '-config', 'ca-config.json', 'csr.json' ], files={ 'ca-config.json': self.ca_config, 'ca.pem': ca_cert, 'ca-key.pem': ca_key, 'csr.json': self.csr(name=cn, groups=groups, hosts=hosts), }) return (self._wrap_cert(name, result['cert']), self._wrap_cert_key(name, result['key'])) def csr(self, *, name, groups=None, hosts=None, key={ 'algo': 'rsa', 'size': 2048 }): if groups is None: groups = [] if hosts is None: hosts = [] return json.dumps({ 'CN': name, 'key': key, 'hosts': hosts, 'names': [{ 'O': g } for g in groups], }) def cert_info(self, cert): """Retrieve certificate info via ``cfssl``. :param str cert: Client certificate that contains the public key. :returns: Information related to certificate. :rtype: dict """ return self._cfssl( ['certinfo', '-cert', 'cert.pem'], files={ 'cert.pem': cert, }) def check_expiry(self, cert): """Chek whether a given certificate is expired. :param str cert: Client certificate that contains the public key. :returns: True if certificate is expired, else False. :rtype: bool """ info = self.cert_info(cert) expiry_str = info['not_after'] expiry = parser.parse(expiry_str) # expiry is timezone-aware; do the same for `now`. now = pytz.utc.localize(datetime.utcnow()) return now > expiry def _cfssl(self, command, *, files=None): """Executes ``cfssl`` command via ``subprocess`` call.""" if not files: files = {} with tempfile.TemporaryDirectory() as tmp: for filename, data in files.items(): with open(os.path.join(tmp, filename), 'w') as f: f.write(data) # Ignore bandit false positive: # B603:subprocess_without_shell_equals_true # This method wraps cfssl calls originating from this module. result = subprocess.check_output( # nosec ['cfssl'] + command, cwd=tmp, stderr=subprocess.PIPE) if not isinstance(result, str): result = result.decode('utf-8') return json.loads(result) def _openssl(self, command, *, files=None): """Executes ``openssl`` command via ``subprocess`` call.""" if not files: files = {} with tempfile.TemporaryDirectory() as tmp: for filename, data in files.items(): with open(os.path.join(tmp, filename), 'w') as f: f.write(data) # Ignore bandit false positive: # B603:subprocess_without_shell_equals_true # This method wraps openssl calls originating from this module. subprocess.check_call( # nosec ['openssl'] + command, cwd=tmp, stderr=subprocess.PIPE) result = {} for filename in os.listdir(tmp): if filename not in files: with open(os.path.join(tmp, filename)) as f: result[filename] = f.read() return result def _wrap_ca(self, name, data): return self.wrap_document(kind='CertificateAuthority', name=name, data=data, block_strings=self.block_strings) def _wrap_ca_key(self, name, data): return self.wrap_document(kind='CertificateAuthorityKey', name=name, data=data, block_strings=self.block_strings) def _wrap_cert(self, name, data): return self.wrap_document(kind='Certificate', name=name, data=data, block_strings=self.block_strings) def _wrap_cert_key(self, name, data): return self.wrap_document(kind='CertificateKey', name=name, data=data, block_strings=self.block_strings) def _wrap_priv_key(self, name, data): return self.wrap_document(kind='PrivateKey', name=name, data=data, block_strings=self.block_strings) def _wrap_pub_key(self, name, data): return self.wrap_document(kind='PublicKey', name=name, data=data, block_strings=self.block_strings) @staticmethod def wrap_document(kind, name, data, block_strings=True): """Wrap document ``data`` with PeglegManagedDocument pattern. :param str kind: The kind of document (found in ``schema``). :param str name: Name of the document. :param dict data: Document data. :param bool block_strings: Whether to dump out certificate data as block-style YAML string. Defaults to true. :return: the wrapped document :rtype: dict """ wrapped_schema = 'deckhand/%s/v1' % kind wrapped_metadata = { 'schema': 'metadata/Document/v1', 'name': name, 'layeringDefinition': { 'abstract': False, 'layer': 'site', } } wrapped_data = PKIUtility._block_literal( data, block_strings=block_strings) document = { "schema": wrapped_schema, "metadata": wrapped_metadata, "data": wrapped_data } return PeglegManagedSecretsDocument(document).pegleg_document @staticmethod def _block_literal(data, block_strings=True): if block_strings: return block_literal(data) else: return data class block_literal(str): pass def block_literal_representer(dumper, data): return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|') yaml.add_representer(block_literal, block_literal_representer)