summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--tests/unit/engine/catalog/__init__.py0
-rw-r--r--tests/unit/engine/catalog/test_pki_utility.py175
2 files changed, 175 insertions, 0 deletions
diff --git a/tests/unit/engine/catalog/__init__.py b/tests/unit/engine/catalog/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/unit/engine/catalog/__init__.py
diff --git a/tests/unit/engine/catalog/test_pki_utility.py b/tests/unit/engine/catalog/test_pki_utility.py
new file mode 100644
index 0000000..62b7a24
--- /dev/null
+++ b/tests/unit/engine/catalog/test_pki_utility.py
@@ -0,0 +1,175 @@
1# Copyright 2019 AT&T Intellectual Property. All other rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import json
16import time
17
18import click
19import mock
20import pytest
21
22from pegleg import config
23from pegleg.engine.catalog import pki_utility
24from pegleg.engine.common import managed_document
25
26CERT_HEADER = '-----BEGIN CERTIFICATE-----\n'
27CERT_KEY_HEADER = '-----BEGIN RSA PRIVATE KEY-----\n'
28PUBLIC_KEY_HEADER = '-----BEGIN PUBLIC KEY-----\n'
29PRIVATE_KEY_HEADER = '-----BEGIN RSA PRIVATE KEY-----\n'
30
31PEGLEG_MANAGED_DOC_SCHEMA = 'pegleg/PeglegManagedDocument/v1'
32CA_SCHEMA = 'deckhand/CertificateAuthority/v1'
33CA_KEY_SCHEMA = 'deckhand/CertificateAuthorityKey/v1'
34CERT_SCHEMA = 'deckhand/Certificate/v1'
35CERT_KEY_SCHEMA = 'deckhand/CertificateKey/v1'
36PUBLIC_KEY_SCHEMA = 'deckhand/PublicKey/v1'
37PRIVATE_KEY_SCHEMA = 'deckhand/PrivateKey/v1'
38
39
40@pytest.mark.skipif(
41 not pki_utility.PKIUtility.cfssl_exists(),
42 reason='cfssl must be installed to execute these tests')
43class TestPKIUtility(object):
44 @classmethod
45 def setup_class(cls):
46 mock.patch.object(
47 managed_document,
48 '_get_repo_url_and_rev',
49 new=lambda: ('fake://github.com/nothing.git', 'master')).start()
50
51 def test_generate_ca(self):
52 pki_obj = pki_utility.PKIUtility()
53 ca_cert_wrapper, ca_key_wrapper = pki_obj.generate_ca(
54 self.__class__.__name__)
55
56 assert 'pegleg/PeglegManagedDocument/v1' == ca_cert_wrapper['schema']
57 assert 'pegleg/PeglegManagedDocument/v1' == ca_key_wrapper['schema']
58
59 ca_cert = ca_cert_wrapper['data']['managedDocument']
60 assert isinstance(ca_cert, dict), ca_cert
61 ca_key = ca_key_wrapper['data']['managedDocument']
62 assert isinstance(ca_key, dict), ca_key
63
64 assert isinstance(ca_cert, dict), ca_cert
65 assert CA_SCHEMA in ca_cert['schema']
66 assert CERT_HEADER in ca_cert['data']
67
68 assert isinstance(ca_key, dict), ca_key
69 assert CA_KEY_SCHEMA in ca_key['schema']
70 assert CERT_KEY_HEADER in ca_key['data']
71
72 def test_generate_keypair(self):
73 pki_obj = pki_utility.PKIUtility()
74 pub_key_wrapper, priv_key_wrapper = pki_obj.generate_keypair(
75 self.__class__.__name__)
76
77 assert 'pegleg/PeglegManagedDocument/v1' == pub_key_wrapper['schema']
78 assert 'pegleg/PeglegManagedDocument/v1' == priv_key_wrapper['schema']
79
80 pub_key = pub_key_wrapper['data']['managedDocument']
81 assert isinstance(pub_key, dict), pub_key
82 priv_key = priv_key_wrapper['data']['managedDocument']
83 assert isinstance(pub_key, dict), priv_key
84
85 assert isinstance(pub_key, dict), pub_key
86 assert PUBLIC_KEY_SCHEMA in pub_key['schema']
87 assert PUBLIC_KEY_HEADER in pub_key['data']
88
89 assert isinstance(priv_key, dict), priv_key
90 assert PRIVATE_KEY_SCHEMA in priv_key['schema']
91 assert PRIVATE_KEY_HEADER in priv_key['data']
92
93 def test_generate_certificate(self):
94 pki_obj = pki_utility.PKIUtility()
95 ca_cert_wrapper, ca_key_wrapper = pki_obj.generate_ca(
96 self.__class__.__name__)
97 ca_cert = ca_cert_wrapper['data']['managedDocument']
98 ca_key = ca_key_wrapper['data']['managedDocument']
99
100 cert_wrapper, cert_key_wrapper = pki_obj.generate_certificate(
101 name=self.__class__.__name__,
102 ca_cert=ca_cert['data'],
103 ca_key=ca_key['data'],
104 cn='admin')
105
106 assert 'pegleg/PeglegManagedDocument/v1' == cert_wrapper['schema']
107 assert 'pegleg/PeglegManagedDocument/v1' == cert_key_wrapper['schema']
108
109 cert = cert_wrapper['data']['managedDocument']
110 assert isinstance(cert, dict), cert
111 cert_key = cert_key_wrapper['data']['managedDocument']
112 assert isinstance(cert_key, dict), cert_key
113
114 assert isinstance(cert, dict), cert
115 assert CERT_SCHEMA in cert['schema']
116 assert CERT_HEADER in cert['data']
117
118 assert isinstance(cert_key, dict), cert_key
119 assert CERT_KEY_SCHEMA in cert_key['schema']
120 assert CERT_KEY_HEADER in cert_key['data']
121
122 def test_check_expiry_is_expired_false(self):
123 """Check that ``check_expiry`` returns False if cert isn't expired."""
124 pki_obj = pki_utility.PKIUtility()
125
126 ca_config = json.loads(pki_obj.ca_config)
127 ca_config['signing']['default']['expiry'] = '1h'
128
129 m_callable = mock.PropertyMock(return_value=json.dumps(ca_config))
130 with mock.patch.object(
131 pki_utility.PKIUtility, 'ca_config', new_callable=m_callable):
132 ca_cert_wrapper, ca_key_wrapper = pki_obj.generate_ca(
133 self.__class__.__name__)
134 ca_cert = ca_cert_wrapper['data']['managedDocument']
135 ca_key = ca_key_wrapper['data']['managedDocument']
136 cert_wrapper, _ = pki_obj.generate_certificate(
137 name=self.__class__.__name__,
138 ca_cert=ca_cert['data'],
139 ca_key=ca_key['data'],
140 cn='admin')
141 cert = cert_wrapper['data']['managedDocument']
142
143 # Validate that the cert hasn't expired.
144 is_expired = pki_obj.check_expiry(cert=cert['data'])
145 assert not is_expired
146
147 def test_check_expiry_is_expired_true(self):
148 """Check that ``check_expiry`` returns True is cert is expired.
149
150 Second values are used to demonstrate precision down to the second.
151 """
152 pki_obj = pki_utility.PKIUtility()
153
154 ca_config = json.loads(pki_obj.ca_config)
155 ca_config['signing']['default']['expiry'] = '1s'
156
157 m_callable = mock.PropertyMock(return_value=json.dumps(ca_config))
158 with mock.patch.object(
159 pki_utility.PKIUtility, 'ca_config', new_callable=m_callable):
160 ca_cert_wrapper, ca_key_wrapper = pki_obj.generate_ca(
161 self.__class__.__name__)
162 ca_cert = ca_cert_wrapper['data']['managedDocument']
163 ca_key = ca_key_wrapper['data']['managedDocument']
164 cert_wrapper, _ = pki_obj.generate_certificate(
165 name=self.__class__.__name__,
166 ca_cert=ca_cert['data'],
167 ca_key=ca_key['data'],
168 cn='admin')
169 cert = cert_wrapper['data']['managedDocument']
170
171 time.sleep(2)
172
173 # Validate that the cert has expired.
174 is_expired = pki_obj.check_expiry(cert=cert['data'])
175 assert is_expired